

# Kinesis 连接
<a name="aws-glue-programming-etl-connect-kinesis-home"></a>

您可以使用 Kinesis 连接来对 Amazon Kinesis Data Streams 执行读取和写入操作，方法是通过存储在 Data Catalog 表中的信息进行读取和写入，或通过提供信息直接访问数据流。您可以从 Kinesis 读取信息到 Spark DataFrame 中，然后将其转换为 AWS Glue DynamicFrame。您可以用 JSON 格式将 DynamicFrames 写入 Kinesis。如果直接访问数据流，请使用这些选项提供有关如何访问数据流的信息。

如果您使用 `getCatalogSource` 或 `create_data_frame_from_catalog` 使用来自 Kinesis 流式处理源的记录，则任务具有数据目录数据库和表名称信息，将其用于获取一些基本参数，以便从 Kinesis 流式处理源读取数据。如果使用 `getSource`、`getSourceWithFormat`、`createDataFrameFromOptions` 或 `create_data_frame_from_options`，则必须使用此处描述的连接选项指定这些基本参数。

您可以使用 `GlueContext` 类中指定方法的以下参数为 Kinesis 指定连接选项。
+ Scala
  + `connectionOptions`：与 `getSource`、`createDataFrameFromOptions`、`getSink` 结合使用 
  + `additionalOptions`：与 `getCatalogSource`、`getCatalogSink` 结合使用
  + `options`：与 `getSourceWithFormat`、`getSinkWithFormat` 结合使用
+ Python
  + `connection_options`：与 `create_data_frame_from_options`、`write_dynamic_frame_from_options` 结合使用
  + `additional_options`：与 `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` 结合使用
  + `options`：与 `getSource`、`getSink` 结合使用

有关流式处理 ETL 作业的注意事项和限制，请参阅 [串流 ETL 注释和限制](add-job-streaming.md#create-job-streaming-restrictions)。

## 配置 Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-configure"></a>

要在 AWS Glue Spark 作业中连接到 Kinesis 数据流，需要具备一些先决条件：
+ 如果读取，AWS Glue 作业必须拥有对 Kinesis 数据流的读取访问权限级别 IAM 权限。
+ 如果写入，AWS Glue 作业必须拥有对 Kinesis 数据流的写入访问权限级别 IAM 权限。

在某些情况下，您需要配置其他先决条件：
+ 如果您的 AWS Glue 作业配置了**其他网络连接**（通常用于连接到其他数据集），并且其中一个连接提供 Amazon VPC **网络选项**，则这将引导您的作业通过 Amazon VPC 进行通信。在这种情况下，您还需要将 Kinesis 数据流配置为通过 Amazon VPC 进行通信。您可以通过在 Amazon VPC 和 Kinesis 数据流之间创建接口 VPC 端点实现此目的。有关更多信息，请参阅 [Using Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com//streams/latest/dev/vpc.html)。
+ 在另一个账户中指定 Amazon Kinesis Data Streams 时，您必须设置角色和策略，从而允许跨账户访问。有关更多信息，请参阅[示例：从不同账户的 Kinesis 串流中读取](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html)。

有关流式处理 ETL 作业先决条件的更多信息，请参阅 [在 AWS Glue 中流式处理 ETL 作业](add-job-streaming.md)。

## 示例：从 Kinesis 流读取
<a name="aws-glue-programming-etl-connect-kinesis-read"></a>

### 示例：从 Kinesis 流读取
<a name="section-etl-connect-kinesis-read"></a>

与 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 结合使用。

Amazon Kinesis 流式处理源示例：

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## 示例：写入 Kinesis 流
<a name="aws-glue-programming-etl-connect-kinesis-write"></a>

### 示例：从 Kinesis 流读取
<a name="section-etl-connect-kinesis-read"></a>

与 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 结合使用。

Amazon Kinesis 流式处理源示例：

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Kinesis 连接选项参考
<a name="aws-glue-programming-etl-connect-kinesis"></a>

为 Amazon Kinesis Data Streams 指定连接选项。

为 Kinesis 流式处理数据源使用以下连接选项：
+ `"streamARN"`（必填）用于读/写。Kinesis 数据流的 ARN。
+ `"classification"`（读取所必填）用于读取。记录中数据使用的文件格式。除非 Data Catalog 提供，否则为必需。
+ `"streamName"` –（可选）用于读取。要从中读取的 Kinesis 数据流的名称。与 `endpointUrl` 一起使用。
+ `"endpointUrl"` –（可选）用于读取。默认：“https://kinesis.us-east-1.amazonaws.com”。Kinesis 流的 AWS 端点。除非要连接到特殊区域，否则您无需对此进行更改。
+ `"partitionKey"` –（可选）用于写入。生成记录时所使用的 Kinesis 分区键。
+ `"delimiter"`（可选）用于读取。当 `classification` 为 CSV 时使用的值分隔符。默认为“`,`”。
+ `"startingPosition"`：（可选）用于读取。要从中读取数据的 Kinesis 数据流中的起始位置。可能的值是 `"latest"`、`"trim_horizon"`、`"earliest"` 或以模式 `yyyy-mm-ddTHH:MM:SSZ` 采用 UTC 格式的时间戳字符串（其中 `Z` 表示带有 \$1/-的 UTC 时区偏移量。例如：“2023-04-04T08:00:00-04:00”）。默认值为 `"latest"`。注意：只有 AWS Glue 版本 4.0 或更高版本支持 `"startingPosition"` 的 UTC 格式中的时间戳字符串。
+ `"failOnDataLoss"`：（可选）如果有任何活动分片丢失或已过期，则作业失败。默认值为 `"false"`。
+ `"awsSTSRoleARN"`：（可选）用于读取/写入。要使用 AWS Security Token Service（AWS STS）代入的角色的 Amazon 资源名称（ARN）。此角色必须拥有针对 Kinesis 数据流执行描述或读取记录操作的权限。在访问其他账户中的数据流时，必须使用此参数。与 `"awsSTSSessionName"` 结合使用。
+ `"awsSTSSessionName"`：（可选）用于读取/写入。使用 AWS STS 代入角色的会话的标识符。在访问其他账户中的数据流时，必须使用此参数。与 `"awsSTSRoleARN"` 结合使用。
+ `"awsSTSEndpoint"`：（可选）使用代入角色连接到 Kinesis 时要使用的 AWS STS 端点。这允许在 VPC 中使用区域 AWS STS 端点，而使用默认的全局端点是不可能的。
+ `"maxFetchTimeInMs"`：（可选）用于读取。作业执行程序从 Kinesis 数据流中读取当前批处理记录所花费的最长时间，以毫秒为单位指定。在这段时间内可以进行多次 `GetRecords` API 调用。默认值为 `1000`。
+ `"maxFetchRecordsPerShard"`：（可选）用于读取。每个微批次将从 Kinesis 数据流中的每个分片获取的最大记录数。注意：如果流式传输作业已经从 Kinesis 读取了额外的记录（在同一个 get-records 调用中），则客户端可以超过此限制。如果 `maxFetchRecordsPerShard` 需要严格，则必须是 `maxRecordPerRead` 的整数倍。默认值为 `100000`。
+ `"maxRecordPerRead"`：（可选）用于读取。每项 `getRecords` 操作中要从 Kinesis 数据流获取的最大记录数。默认值为 `10000`。
+ `"addIdleTimeBetweenReads"`：（可选）用于读取。在两项连续 `getRecords` 操作之间添加时间延迟。默认值为 `"False"`。此选项仅适用于 Glue 版本 2.0 及更高版本。
+ `"idleTimeBetweenReadsInMs"`：（可选）用于读取。两项连续 `getRecords` 操作之间的最短时间延迟，以毫秒为单位。默认值为 `1000`。此选项仅适用于 Glue 版本 2.0 及更高版本。
+ `"describeShardInterval"`：（可选）用于读取。两个 `ListShards` API 调用之间的最短时间间隔，供您的脚本考虑重新分区。有关更多信息，请参阅《Amazon Kinesis Data Streams 开发人员指南》中的[重新分区策略](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html)**。默认值为 `1s`。
+ `"numRetries"`：（可选）用于读取。Kinesis Data Streams API 请求的最大重试次数。默认值为 `3`。
+ `"retryIntervalMs"`：（可选）用于读取。重试 Kinesis Data Streams API 调用之前的冷却时间（以毫秒为单位指定）。默认值为 `1000`。
+ `"maxRetryIntervalMs"`：（可选）用于读取。Kinesis Data Streams API 调用的两次重试之间的最长冷却时间（以毫秒为单位指定）。默认值为 `10000`。
+ `"avoidEmptyBatches"`：（可选）用于读取。在批处理开始之前检查 Kinesis 数据流中是否有未读数据，避免创建空白微批处理任务。默认值为 `"False"`。
+ `"schema"`：（当 inferSchema 设为 false 时为必填）用于读取。用于处理有效负载的架构。如果分类为 `avro`，则提供的架构必须采用 Avro 架构格式。如果分类不是 `avro`，则提供的架构必须采用 DDL 架构格式。

  以下是一些架构示例。

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`：（可选）用于读取。默认值为‘false’。如果设置为“true”，则会在运行时检测到 `foreachbatch` 内的有效工作负载中的架构。
+ `"avroSchema"`：（已弃用）用于读取。用于指定 Avro 数据架构（使用 Avro 格式时）的参数。此参数现已被弃用。使用 `schema` 参数。
+ `"addRecordTimestamp"`：（可选）用于读取。当选项设置为 'true' 时，数据输出将包含一个名为 "\$1\$1src\$1timestamp" 的附加列，表示数据流收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。
+ `"emitConsumerLagMetrics"`：（可选）用于读取。当选项设置为“true”时，对于每个批次，它将向 CloudWatch 发布数据流接收到的最早记录与该记录到达 AWS Glue 之间的时长指标。指标名称为 "glue.driver.streaming.maxConsumerLagInMs"。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。
+ `"fanoutConsumerARN"`：（可选）用于读取。`streamARN` 中指定的流的 Kinesis 流用户的 ARN。用于为您的 Kinesis 连接启用增强型扇出功能模式。有关使用增强型扇出功能的 Kinesis 流的更多信息，请参阅 [在 Kinesis 流作业中使用增强型扇出功能](aws-glue-programming-etl-connect-kinesis-efo.md)。
+ `"recordMaxBufferedTime"` –（可选）用于写入。默认值：1000（ms）。记录在等待写入时缓冲的最长时间。
+ `"aggregationEnabled"` –（可选）用于写入。默认值：真。指定是否应在将记录发送到 Kinesis 之前对其进行汇总。
+ `"aggregationMaxSize"` –（可选）用于写入。默认值：51200（字节）。如果记录超过了此限制，则它将绕过聚合器。注意：Kinesis 将记录大小限制为 50KB。如果您将其设置为 50KB 以上，Kinesis 将拒绝过大的记录。
+ `"aggregationMaxCount"` –（可选）用于写入。默认值：4294967295。要打包至汇总记录的最大项目数量。
+ `"producerRateLimit"` –（可选）用于写入。默认值：150（%）。限制从单个生产者（例如您的作业）发送的每个分片的吞吐量，以占后端限制的百分比表示。
+ `"collectionMaxCount"` –（可选）用于写入。默认值：500。要装入 PutRecords 请求的最大项目数量。
+ `"collectionMaxSize"` –（可选）用于写入。默认值：5242880（字节）。通过 PutRecords 请求发送的最大数据量。

# 在 Kinesis 流作业中使用增强型扇出功能
<a name="aws-glue-programming-etl-connect-kinesis-efo"></a>

使用增强型扇出功能的用户能够接收来自 Kinesis 流的记录，其专用吞吐量可能高于普通用户。这是通过优化用于向 Kinesis 用户（例如您的作业）提供数据的传输协议来完成的。有关 Kinesis 增强型扇出功能的更多信息，请参阅 [Kinesis 文档](https://docs.aws.amazon.com//streams/latest/dev/enhanced-consumers.html)。

在增强型扇出功能模式下，`maxRecordPerRead` 和 `idleTimeBetweenReadsInMs` 连接选项不再适用，因为使用增强型扇出功能时这些参数不可配置。重试的配置选项按所述执行。

使用以下步骤为您的流作业启用和禁用增强型扇出功能。应为每项将消耗流中数据的作业注册一个流用户。

**要在作业中启用增强型扇出功能使用，请执行以下操作：**

1. 使用 Kinesis API 为作业注册流用户。按照说明，使用 [Kinesis 文档](https://docs.aws.amazon.com//streams/latest/dev/building-enhanced-consumers-api)中的*使用 Kinesis Data Streams API 为用户注册增强型扇出功能*。只需要按照第一步进行操作，即调用 [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)。您的请求应返回 ARN *consumerARN*。

1. 在连接方法参数中将连接选项 `fanoutConsumerARN` 设置为 *consumerARN*。

1. 重新启动作业。

**要在作业中禁用增强型扇出功能使用，请执行以下操作：**

1. 从方法调用中移除 `fanoutConsumerARN` 连接选项。

1. 重新启动作业。

1. 按照 [Kinesis 文档](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-console.html)中的说明，*取消注册用户*。这些说明适用于控制台，但也可以通过 Kinesis API 来实现。有关通过 Kinesis API 取消注册流用户的更多信息，请查阅 Kinesis 文档中的 [DeregisterStreamConsumer](https://docs.aws.amazon.com//kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)。