

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 支持的流处理连接器
<a name="jobs-spark-streaming-connectors"></a>

流处理连接器有助于从流处理源读取数据，也可以将数据写入流处理接收器。

以下是支持的流处理连接器：

**Amazon Kinesis Data Streams 连接器**

适用于 Apache Spark 的 [Amazon Kinesis Data Streams](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html) 连接器支持构建流处理应用程序和管道，这些应用程序和管道使用来自 Amazon Kinesis Data Streams 的数据并向其写入数据。该连接器支持增强的扇出消耗，每个分片的专用读取吞吐率高达 2MB/秒。默认情况下，Amazon EMR Serverless 7.1.0 及更高版本包含该连接器，因此您无需构建或下载任何其他软件包。有关连接器的更多信息，请参阅[上的 spark-sql-kinesis-connector页面 GitHub](https://github.com/awslabs/spark-sql-kinesis-connector/)。

以下示例展示了如何使用 Kinesis Data Streams 连接器依赖项启动作业运行。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kinesis-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --jars /usr/share/aws/kinesis/spark-sql-kinesis/lib/spark-streaming-sql-kinesis-connector.jar"
    }
}'
```

要连接到 Kinesis Data Streams，请将 EMR Serverless 应用程序配置为 VPC 访问，并使用 VPC 端点允许私有访问，或使用 NAT 网关进行公有访问。有关更多信息，请参阅[配置 VPC 访问](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)。您还必须确保作业运行时角色拥有访问所需数据流的必要读写权限。要了解有关如何配置作业运行时角色的更多信息，请参阅 [Amazon EMR Serverless 的作业运行时角色](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/security-iam-runtime-role.html)。有关所有必需权限的完整列表，请参阅[上的spark-sql-kinesis-connector 页面 GitHub](https://github.com/awslabs/spark-sql-kinesis-connector/?tab=readme-ov-file#how-to-use-it)。

**Apache Kafka 连接器**

适用于 Spark 结构化流处理的 Apache Kafka 连接器是来自 Spark 社区的开源连接器，可在 Maven 存储库中使用。此连接器有助于 Spark 结构化流处理应用程序从自我管理的 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka 读取数据并向其写入数据。有关连接器的更多信息，请参阅 Apache Spark 文档中的[结构化流处理 \$1 Kafka 集成指南](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

以下示例演示了如何在作业运行请求中包含 Kafka 连接器。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>"
    }
}'
```

Apache Kafka 连接器版本取决于 EMR Serverless 发行版和相应的 Spark 版本。要查找正确的 Kafka 版本，请参阅[结构化流处理 \$1 Kafka 集成指南](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

要将 Amazon Managed Streaming for Apache Kafka 与 IAM 身份验证结合使用，请包含另一个依赖项，以使 Kafka 连接器能够通过 IAM 连接到 Amazon MSK。有关更多信息，请参阅[上的aws-msk-iam-auth 存储库 GitHub](https://github.com/aws/aws-msk-iam-auth)。您还必须确保作业运行时角色拥有必要的 IAM 权限。以下示例演示了如何将连接器与 IAM 身份验证结合使用。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>,software.amazon.msk:aws-msk-iam-auth:<MSK_IAM_LIB_VERSION>"
    }
}'
```

要使用 Kafka 连接器和 Amazon MSK 中的 IAM 身份验证库，请为 EMR Serverless 应用程序配置 VPC 访问。子网必须能够访问互联网，并使用 NAT 网关来访问 Maven 依赖项。有关更多信息，请参阅[配置 VPC 访问](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)。子网必须连接网络才能访问 Kafka 集群。无论您的 Kafka 集群是自我管理，还是使用 Amazon Managed Streaming for Apache Kafka，都是如此。