

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Kinesis 連線
<a name="aws-glue-programming-etl-connect-kinesis-home"></a>

您可以使用 Kinesis 連線，透過儲存在 Data Catalog 資料表中的資訊讀取和寫入 Amazon Kinesis 資料串流，或透過提供資訊來直接存取資料串流。您可以從 Kinesis 讀取資訊到 Spark DataFrame，然後將其轉換為 AWS Glue DynamicFrame。您可以使用 JSON 格式將 DynamicFrames 寫入 Kinesis。如果您直接存取資料串流，則請使用這些選項來提供如何存取資料串流的相關資訊。

如果您使用 `getCatalogSource` 或 `create_data_frame_from_catalog` 來取用來自 Kinesis 串流來源的記錄，則該任務具有 Data Catalog 資料庫和資料表名稱資訊，並可以使用它來獲取一些從 Kinesis 串流來源讀取的基本參數。如果使用 `getSource`、`getSourceWithFormat`、`createDataFrameFromOptions` 或 `create_data_frame_from_options`，則您必須使用此處描述的連線選項來指定這些基本參數。

您可以使用 `GlueContext` 類別中指定方法的下列引數來指定 Kinesis 的連線選項。
+ Scala
  + `connectionOptions`：與 `getSource`、`createDataFrameFromOptions`、`getSink` 搭配使用 
  + `additionalOptions`：與 `getCatalogSource`、`getCatalogSink` 搭配使用。
  + `options`：與 `getSourceWithFormat`、`getSinkWithFormat` 搭配使用。
+ Python
  + `connection_options`：與 `create_data_frame_from_options`、`write_dynamic_frame_from_options` 搭配使用。
  + `additional_options`：與 `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` 搭配使用。
  + `options`：與 `getSource`、`getSink` 搭配使用。

如需有關串流 ETL 任務的注意事項和限制，請參閱 [串流 ETL 注意事項和限制](add-job-streaming.md#create-job-streaming-restrictions)。

## 設定 Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-configure"></a>

若要連線到 Glue Spark 任務中的 Kinesis AWS 資料串流，您需要一些先決條件：
+ 如果讀取，Glue AWS 任務必須具有 Kinesis 資料串流的讀取存取層級 IAM 許可。
+ 如果寫入，Glue AWS 任務必須具有 Kinesis 資料串流的寫入存取層級 IAM 許可。

在某些情況下，您需要設定其他先決條件：
+ 如果您的 AWS Glue 任務設定了**其他網路連線** （通常連接到其他資料集），且其中一個連線提供 Amazon VPC **網路選項**，這將引導您的任務透過 Amazon VPC 進行通訊。在這種情況下，您還需要將 Kinesis 資料串流設定為透過 Amazon VPC 進行通訊。為此，您可以建立 Amazon VPC 與 Kinesis 資料串流之間的介面 VPC 端點。如需詳細資訊，請參閱 [Using Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com//streams/latest/dev/vpc.html)。
+ 在另一個帳戶中指定 Amazon Kinesis Data Streams 時，您必須設定角色和政策以允許跨帳戶存取。如需詳細資訊，請參閱[範例：從不同帳戶中的 Kinesis 串流讀取](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html)。

如需有關串流 ETL 任務先決條件的詳細資訊，請參閱 [在 AWS Glue 中串流 ETL 任務](add-job-streaming.md)。

## 範例：從 Kinesis 串流讀取
<a name="aws-glue-programming-etl-connect-kinesis-read"></a>

### 範例：從 Kinesis 串流讀取
<a name="section-etl-connect-kinesis-read"></a>

搭配 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 使用。

Amazon Kinesis 串流來源範例：

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## 範例：寫入 Kinesis 串流
<a name="aws-glue-programming-etl-connect-kinesis-write"></a>

### 範例：從 Kinesis 串流讀取
<a name="section-etl-connect-kinesis-read"></a>

搭配 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 使用。

Amazon Kinesis 串流來源範例：

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Kinesis 連線選項參考
<a name="aws-glue-programming-etl-connect-kinesis"></a>

指定 Amazon Kinesis Data Streams 的連線選項。

針對 Kinesis 串流資料來源使用下列的連線選項：
+ `"streamARN"` (必要) 用於讀取/寫入。Kinesis 資料串流的 ARN。
+ `"classification"` (讀取時為必要) 用於讀取。記錄中資料使用的檔案格式。除非透過資料目錄提供，否則為必要。
+ `"streamName"` – (選用) 用於讀取。要從中讀取的 Kinesis 資料串流名稱。與 `endpointUrl` 搭配使用。
+ `"endpointUrl"` – (選用) 用於讀取。預設："https://kinesis.us-east-1.amazonaws.com"。Kinesis 串流的 AWS 端點。除非您要連線到特殊區域，否則無需變更此設定。
+ `"partitionKey"` – (選用) 用於寫入。在產生記錄時使用的 Kinesis 分割區索引鍵。
+ `"delimiter"` (選用) 用於讀取。當 `classification` 為 CSV 時使用的值分隔符號。預設值為 "`,`"。
+ `"startingPosition"`：(選用) 用於讀取。Kinesis 資料串流中要從中讀取資料的起始位置。可能的值包括 `"latest"`、`"trim_horizon"`、`"earliest"` 或 `yyyy-mm-ddTHH:MM:SSZ` 模式中 UTC 格式的時間戳記字串 (其中 `Z` 代表以 \$1/- 表示的 UTC 時區偏移。例如："2023-04-04T08:00:00-04:00")。預設值為 `"latest"`。注意：僅 AWS Glue 4.0 版或更新版本`"startingPosition"`支援 UTC 格式的時間戳記字串。
+ `"failOnDataLoss"`：(選用) 如果有任何作用中的碎片遺失或過期，則任務失敗。預設值為 `"false"`。
+ `"awsSTSRoleARN"`：(選用) 用於讀取/寫入。要使用 () 擔任之角色的 Amazon Resource Name AWS Security Token Service (ARN AWS STS)。此角色必須具有描述或讀取 Kinesis 資料串流記錄操作的許可。存取不同帳戶中的資料串流時，您必須使用此參數。搭配 `"awsSTSSessionName"` 使用。
+ `"awsSTSSessionName"`：(選用) 用於讀取/寫入。使用 AWS STS擔任角色之工作階段的識別符。存取不同帳戶中的資料串流時，您必須使用此參數。搭配 `"awsSTSRoleARN"` 使用。
+ `"awsSTSEndpoint"`：（選用） 使用 擔任的角色連線至 Kinesis 時要使用的 AWS STS 端點。這允許在 VPC 中使用區域 AWS STS 端點，這是預設全域端點無法做到的。
+ `"maxFetchTimeInMs"`：(選用) 用於讀取。任務執行器從 Kinesis 資料串流讀取目前批次記錄所花費的最長時間，以毫秒 (ms) 為單位指定。在此期間可以進行多次 `GetRecords` API 呼叫。預設值為 `1000`。
+ `"maxFetchRecordsPerShard"`：(選用) 用於讀取。每個微批次的 Kinesis 資料串流中每個碎片要擷取的記錄數目上限。注意：如果串流任務已從 Kinesis 讀取額外的記錄 (在相同的 get-records 呼叫中)，用戶端可以超過此限制。如果 `maxFetchRecordsPerShard` 需要嚴格控制，則其必須是 `maxRecordPerRead` 的倍數。預設值為 `100000`。
+ `"maxRecordPerRead"`：(選用) 用於讀取。要從每個 `getRecords` 操作的 Kinesis 資料串流中擷取的記錄數量上限。預設值為 `10000`。
+ `"addIdleTimeBetweenReads"`：(選用) 用於讀取。增加兩個連續 `getRecords` 操作之間的時間延遲。預設值為 `"False"`。此選項僅在 Glue 2.0 及更高版本上才可設定。
+ `"idleTimeBetweenReadsInMs"`：(選用) 用於讀取。兩個連續 `getRecords` 操作的最小延遲時間，以毫秒為單位指定。預設值為 `1000`。此選項僅在 Glue 2.0 及更高版本上才可設定。
+ `"describeShardInterval"`：(選用) 用於讀取。指令碼考慮重新分片的兩個 `ListShards` API 呼叫之間的最小時間間隔。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[重新分片的策略](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html)。預設值為 `1s`。
+ `"numRetries"`：(選用) 用於讀取。Kinesis Data Streams API 請求的重試數上限。預設值為 `3`。
+ `"retryIntervalMs"`：(選用) 用於讀取。重試 Kinesis Data Streams API 呼叫之前的冷卻時間期間 (以毫秒為單位)。預設值為 `1000`。
+ `"maxRetryIntervalMs"`：(選用) 用於讀取。Kinesis Data Streams API 呼叫之兩次重試之間的最大冷卻時間期間 (以毫秒為單位)。預設值為 `10000`。
+ `"avoidEmptyBatches"`：(選用) 用於讀取。避免建立空白微批次任務，方法是在批次開始之前檢查 Kinesis 資料串流中是否有未讀取的資料。預設值為 `"False"`。
+ `"schema"`：(在 inferSchema 設定為 false 時為必要) 用於讀取。用於處理承載的結構描述。如果分類為 `avro`，提供的架構必須採用 Avro 架構格式。如果分類不是 `avro`，提供的架構必須採用 DDL 架構格式。

  以下是架構範例。

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`：(選用) 用於讀取。預設值為 'false'。如果設為 'true'，將在執行時間時從 `foreachbatch` 承載偵測架構。
+ `"avroSchema"`：(已棄用) 用於讀取。使用 Avro 格式時，用於指定 Avro 資料架構的參數。此參數現已棄用。使用 `schema` 參數。
+ `"addRecordTimestamp"`：(選用) 用於讀取。當此選項設定為 'true' 時，資料輸出將包含一個名為 "\$1\$1src\$1timestamp" 的額外資料欄，其指示串流收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
+ `"emitConsumerLagMetrics"`：(選用) 用於讀取。當選項設定為 "true" 時，在介於串流收到最舊記錄與其在 AWS Glue 中到達 CloudWatch 的時間之間的持續時間內，將會針對每個批次發出指標。指標的名稱為 "glue.driver.streaming.maxConsumerLagInMs"。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
+ `"fanoutConsumerARN"`：(選用) 用於讀取。Kinesis 串流取用者的 ARN，適用於 `streamARN` 中指定的串流。用於啟用 Kinesis 連線的強化廣發功能模式。如需有關使用強化廣發功能使用 Kinesis 串流的詳細資訊，請參閱 [在 Kinesis 串流任務中使用強化廣發功能](aws-glue-programming-etl-connect-kinesis-efo.md)。
+ `"recordMaxBufferedTime"` – (選用) 用於寫入。預設：1000 (毫秒)。在等待寫入時，記錄受到緩衝的最長時間。
+ `"aggregationEnabled"` – (選用) 用於寫入。預設：true。指定是否應在將記錄傳送至 Kinesis 前先彙整記錄。
+ `"aggregationMaxSize"` – (選用) 用於寫入。預設：51200 (位元組)。若記錄大於此限制，則其會略過彙整工具。請注意，Kinesis 會強制執行 50 KB 的記錄大小限制。若您將此值設定為超過 50 KB，Kinesis 將會拒絕過大的記錄。
+ `"aggregationMaxCount"` – (選用) 用於寫入。預設：4294967295。要匯入彙整記錄的項目數量上限。
+ `"producerRateLimit"` – (選用) 用於寫入。預設：150 (%)。作為後端限制的百分比來限制從單一生產者 (例如您的任務) 傳送的每個碎片輸送量。
+ `"collectionMaxCount"` – (選用) 用於寫入。預設：500。要匯入 PutRecords 請求的項目數量上限。
+ `"collectionMaxSize"` – (選用) 用於寫入。預設：5242880 (位元組)。使用 PutRecords 請求傳送的最大資料量。

# 在 Kinesis 串流任務中使用強化廣發功能
<a name="aws-glue-programming-etl-connect-kinesis-efo"></a>

強化廣發功能取用者能夠從 Kinesis 串流接收記錄，其專用輸送量可能高於一般取用者。這是透過最佳化用來提供資料給 Kinesis 取用者 (例如您的任務) 的傳輸通訊協定來完成。如需有關 Kinesis 強化廣發功能的詳細資訊，請參閱 [Kinesis 文件](https://docs.aws.amazon.com//streams/latest/dev/enhanced-consumers.html)。

在強化廣發功能模式下，`maxRecordPerRead` 和 `idleTimeBetweenReadsInMs` 連線選項不再適用，因為使用強化廣發功能時無法設定這些參數。重試的組態選項會如上所述執行。

使用下列程序來啟用和停用串流任務的強化廣發功能。您應該為每個會使用串流資料的任務註冊串流取用者。

**若要在任務上啟用強化廣發功能使用服務：**

1. 使用 Kinesis API 為您的任務註冊串流取用者。按照 [Kinesis 文件](https://docs.aws.amazon.com//streams/latest/dev/building-enhanced-consumers-api)中的說明，*使用 Kinesis Data Streams API 向強化廣發功能註冊取用者*。您只需要按照第一步：呼叫 [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) 進行操作。您的請求應傳回一個 ARN，即 *consumerARN*。

1. 在連線方法引數中將連線選項 `fanoutConsumerARN` 設定為 *consumerARN*。

1. 重新啟動您的任務。

**若要停用任務上的強化廣發功能取用服務：**

1. 從您的方法呼叫中移除 `fanoutConsumerARN` 連線選項。

1. 重新啟動您的任務。

1. 按照 [Kinesis 文件](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-console.html)中的說明*取消註冊取用者*。這些說明適用於主控台，但也可以透過 Kinesis API 來實現。如需有關透過 Kinesis API 進行串流取用者取消註冊的詳細資訊，請參閱 Kinesis 文件中的 [DeregisterStreamConsumer](https://docs.aws.amazon.com//kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)。