

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 处理连续流数据的流处理作业
<a name="jobs-streaming"></a>

EMR Serverless 中的流处理作业是一种作业模式，可让您近乎实时地分析和处理流数据。这些长时间运行的作业会轮询流数据，并在数据到达时持续处理结果。流处理作业最适合需要实时数据处理的任务，如近实时分析、欺诈检测和推荐引擎。EMR Serverless 流处理作业提供了优化功能，如内置作业弹性、实时监控、增强的日志管理以及与流连接器的集成。

以下是流处理作业的一些用例：
+ **近实时分析**：Amazon EMR Serverless 中的流处理作业让您可以近乎实时地处理流数据，以便对日志数据、传感器数据或点击流数据等连续数据流执行实时分析，从而根据最新信息获得见解并及时做出决策。
+ **欺诈检测**：在分析数据流并发现可疑模式或异常情况时，可使用流处理作业对金融交易、信用卡业务或在线活动进行近乎实时的欺诈检测。
+ **推荐引擎**：流处理作业可以处理用户活动数据并更新推荐模型。这样可根据行为和偏好进行个性化的实时推荐。
+ **社交媒体分析**：流处理作业可以处理推文、评论和帖子等社交媒体数据，让组织近乎实时地监控趋势、分析情感和管理品牌声誉。
+ **物联网 (IoT) 分析**：流处理作业可以处理和分析来自物联网设备、传感器和连接机器的高速数据流，以便运行异常检测、预测性维护和其他物联网分析用例。
+ **点击流分析**：流处理作业可以处理和分析来自网站或移动应用程序的点击流数据。使用此类数据的企业可以进行分析，以深入了解用户行为、提供个性化用户体验、优化营销活动。
+ **日志监控和分析**：流处理作业还可以处理来自服务器、应用程序和网络设备的日志数据。为您提供异常检测、故障排除、系统运行状况和性能。

**主要优势**

EMR Serverless 中的流处理作业会自动提供作业*弹性*，这是以下因素组合的结果：
+ **自动重试**：EMR Serverless 会自动重试任何失败的作业，无需手动输入。
+ **可用区（AZ）弹性**：如果原始可用区出现问题，EMR Serverless 会自动将流处理作业切换到运行状况良好的可用区。
+ **日志管理：**
  + **日志轮换**：为了更有效地管理磁盘存储，EMR Serverless 会定期轮换长时间流处理作业的日志。这样可防止可能占用磁盘空间的日志累积。
  + **日志压缩**：帮助您有效管理和优化托管持久性中的日志文件。在使用托管 Spark History Server 时，压缩还可以改善调试体验。

**支持的数据来源和数据接收器**

EMR Serverless 可与多个输入数据来源和输出数据接收器配合使用：
+ 支持的输入数据来源：Amazon Kinesis Data Streams、Amazon Managed Streaming for Apache Kafka 和自我管理的 Apache Kafka 集群。默认情况下，Amazon EMR 7.1.0 及更高版本包含 [Amazon Kinesis Data Streams 连接器](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html)，因此您无需构建或下载任何其他软件包。
+ 支持的输出数据接收器 — AWS Glue 数据目录表、亚马逊 S3、亚马逊 Redshift、MySQL、PostgreSQL 甲骨文、甲骨文、微软 SQL、Apache Iceberg、Delta Lake 和 Apache Hudi。

## 注意事项和限制
<a name="jobs-spark-streaming-considerations"></a>

使用流处理作业时，请记住以下注意事项和限制。
+ [Amazon EMR 7.1.0 及更高版本](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-710-release.html)支持流处理作业。
+ EMR Serverless 期望流处理作业能长时间运行，因此无法设置执行超时来限制作业的运行时间。
+ 流处理作业仅与 Spark 引擎兼容，该引擎构建在[结构化流框架](https://spark.apache.org/streaming/)之上。
+ EMR Serverless 会无限期重试流处理作业，您无法自定义最大尝试次数。如果失败的尝试次数超过了每小时窗口内的阈值，则会自动包含防抖动功能。默认阈值为一小时内 5 次失败尝试。您可以将此阈值配置为 1 到 10 次尝试。有关更多信息，请参阅 [Job resiliency](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/SECTION-jobs-resiliency.xml.html)。
+ 流处理作业具有检查点来保存运行时状态和进度，因此 EMR Serverless 可以从最新的检查点恢复流处理作业。有关更多信息，请参阅 Apache Spark 文档中的 [Recovering from failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)。

# 开始使用流处理作业
<a name="jobs-spark-streaming-getting-started"></a>

请参阅以下说明，了解如何开始使用流处理作业。

1. 要创建应用程序，请参阅[开始使用 Amazon EMR Serverless](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/getting-started.html)。请注意，您的应用程序必须运行 [Amazon EMR 7.1.0 或更高版本](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-710-release.html)。

1. 应用程序准备就绪后，将`mode`参数设置为`STREAMING`以提交流媒体作业，类似于以下 AWS CLI 示例。

   ```
   aws emr-serverless start-job-run \
   --application-id <APPPLICATION_ID> \
   --execution-role-arn <JOB_EXECUTION_ROLE> \
   --mode 'STREAMING' \
   --job-driver '{
       "sparkSubmit": {
           "entryPoint": "s3://<streaming script>",
           "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
           "sparkSubmitParameters": "--conf spark.executor.cores=4
               --conf spark.executor.memory=16g 
               --conf spark.driver.cores=4
               --conf spark.driver.memory=16g 
               --conf spark.executor.instances=3"
       }
   }'
   ```

# 支持的流处理连接器
<a name="jobs-spark-streaming-connectors"></a>

流处理连接器有助于从流处理源读取数据，也可以将数据写入流处理接收器。

以下是支持的流处理连接器：

**Amazon Kinesis Data Streams 连接器**

适用于 Apache Spark 的 [Amazon Kinesis Data Streams](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html) 连接器支持构建流处理应用程序和管道，这些应用程序和管道使用来自 Amazon Kinesis Data Streams 的数据并向其写入数据。该连接器支持增强的扇出消耗，每个分片的专用读取吞吐率高达 2MB/秒。默认情况下，Amazon EMR Serverless 7.1.0 及更高版本包含该连接器，因此您无需构建或下载任何其他软件包。有关连接器的更多信息，请参阅[上的 spark-sql-kinesis-connector页面 GitHub](https://github.com/awslabs/spark-sql-kinesis-connector/)。

以下示例展示了如何使用 Kinesis Data Streams 连接器依赖项启动作业运行。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kinesis-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --jars /usr/share/aws/kinesis/spark-sql-kinesis/lib/spark-streaming-sql-kinesis-connector.jar"
    }
}'
```

要连接到 Kinesis Data Streams，请将 EMR Serverless 应用程序配置为 VPC 访问，并使用 VPC 端点允许私有访问，或使用 NAT 网关进行公有访问。有关更多信息，请参阅[配置 VPC 访问](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)。您还必须确保作业运行时角色拥有访问所需数据流的必要读写权限。要了解有关如何配置作业运行时角色的更多信息，请参阅 [Amazon EMR Serverless 的作业运行时角色](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/security-iam-runtime-role.html)。有关所有必需权限的完整列表，请参阅[上的spark-sql-kinesis-connector 页面 GitHub](https://github.com/awslabs/spark-sql-kinesis-connector/?tab=readme-ov-file#how-to-use-it)。

**Apache Kafka 连接器**

适用于 Spark 结构化流处理的 Apache Kafka 连接器是来自 Spark 社区的开源连接器，可在 Maven 存储库中使用。此连接器有助于 Spark 结构化流处理应用程序从自我管理的 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka 读取数据并向其写入数据。有关连接器的更多信息，请参阅 Apache Spark 文档中的[结构化流处理 \$1 Kafka 集成指南](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

以下示例演示了如何在作业运行请求中包含 Kafka 连接器。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>"
    }
}'
```

Apache Kafka 连接器版本取决于 EMR Serverless 发行版和相应的 Spark 版本。要查找正确的 Kafka 版本，请参阅[结构化流处理 \$1 Kafka 集成指南](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

要将 Amazon Managed Streaming for Apache Kafka 与 IAM 身份验证结合使用，请包含另一个依赖项，以使 Kafka 连接器能够通过 IAM 连接到 Amazon MSK。有关更多信息，请参阅[上的aws-msk-iam-auth 存储库 GitHub](https://github.com/aws/aws-msk-iam-auth)。您还必须确保作业运行时角色拥有必要的 IAM 权限。以下示例演示了如何将连接器与 IAM 身份验证结合使用。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>,software.amazon.msk:aws-msk-iam-auth:<MSK_IAM_LIB_VERSION>"
    }
}'
```

要使用 Kafka 连接器和 Amazon MSK 中的 IAM 身份验证库，请为 EMR Serverless 应用程序配置 VPC 访问。子网必须能够访问互联网，并使用 NAT 网关来访问 Maven 依赖项。有关更多信息，请参阅[配置 VPC 访问](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)。子网必须连接网络才能访问 Kafka 集群。无论您的 Kafka 集群是自我管理，还是使用 Amazon Managed Streaming for Apache Kafka，都是如此。

# 流处理作业日志管理
<a name="jobs-spark-streaming-log-management"></a>

流处理作业支持 Spark 应用程序日志和事件日志的日志轮换，以及 Spark 事件日志的日志压缩。这可以帮助您有效管理资源。

**日志轮换**

流处理作业支持 Spark 应用程序日志和事件日志的日志轮换。日志轮换可防止长时间流处理作业生成大型日志文件，占用可用磁盘空间。日志轮换可帮助您节省磁盘存储空间，并防止由于磁盘空间不足而导致作业失败。有关更多信息，请参阅[轮换日志](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/rotating-logs.html)。

**日志压缩**

当托管日志可用时，流处理作业还支持对 Spark 事件日志进行日志压缩。有关托管日志记录的更多详细信息，请参阅[使用托管存储进行日志记录](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/logging.html#jobs-log-storage-managed-storage)。流处理可以长时间运行，事件数据量会随着时间的推移而增加，并显著增加日志文件大小。Spark History Server 会读取这些事件，将其加载到 Spark 应用程序 UI 的内存中。此过程可能会产生高延迟和高成本，尤其是当 Amazon S3 中存储的事件日志非常大时。

日志压缩可减小事件日志的大小，因此 Spark History Server 在任何时候都不必加载超过 1GB 的事件日志。有关更多信息，请参阅 Apache Spark 文档中的[监控和仪表](https://spark.apache.org/docs/latest/monitoring.html)。