

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Glue 串流連線
<a name="glue-streaming-connections"></a>

 下列各節提供如何在 AWS Glue 串流中使用連線的資訊。

**Topics**
+ [使用 Kafka 連線](#glue-streaming-kafka-connections)
+ [使用 Kinesis 連線](#glue-streaming-kinesis-connections)

## 使用 Kafka 連線
<a name="glue-streaming-kafka-connections"></a>

您可以使用 Kafka 連線來讀取和寫入 Kafka 資料串流，方法是使用儲存在 Data Catalog 資料表中的資訊，或提供資訊以直接存取資料串流。連線支援 Kafka 叢集或 Amazon Managed Streaming for Apache Kafka 叢集。您可以從 Kafka 讀取資訊到 Spark DataFrame，然後將其轉換為 AWS Glue DynamicFrame。您可以使用 JSON 格式將 DynamicFrames 寫入 Kafka。如果您直接存取資料串流，則請使用這些選項來提供如何存取資料串流的相關資訊。

如果您使用 `getCatalogSource` 或 `create_data_frame_from_catalog` 來取用來自 Kinesis 串流來源的記錄，或者使用 `getCatalogSink` 或 `write_dynamic_frame_from_catalog` 將記錄寫入 Kafka，並且則該任務具有 Data Catalog 資料庫和資料表名稱資訊，並可以使用其來取得一些從 Kafka 串流來源讀取的基本參數。如果使用 `getSource`、`getCatalogSink`、`getSourceWithFormat`、`getSinkWithFormat`、`createDataFrameFromOptions`、`create_data_frame_from_options` 或 `write_dynamic_frame_from_catalog`，您必須使用此處描述的連線選項來指定這些基本參數。

您可以使用 `GlueContext` 類別中指定方法的下列引數來指定 Kafka 的連線選項。
+ Scala
  + `connectionOptions`：與 `getSource`、`createDataFrameFromOptions`、`getSink` 搭配使用 
  + `additionalOptions`：與 `getCatalogSource`、`getCatalogSink` 搭配使用。
  + `options`：與 `getSourceWithFormat`、`getSinkWithFormat` 搭配使用。
+ Python
  + `connection_options`：與 `create_data_frame_from_options`、`write_dynamic_frame_from_options` 搭配使用。
  + `additional_options`：與 `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` 搭配使用。
  + `options`：與 `getSource`、`getSink` 搭配使用。

如需有關串流 ETL 任務的注意事項和限制，請參閱 [串流 ETL 注意事項和限制](add-job-streaming.md#create-job-streaming-restrictions)。

**Topics**
+ [設定 Kafka](#glue-streaming-etl-connect-kafka-configure)
+ [範例：從 Kafka 串流讀取](#glue-streaming-etl-connect-kafka-read)
+ [範例：寫入 Kafka 串流](#glue-streaming-etl-connect-kafka-write)
+ [Kafka 連線選項參考](#glue-streaming-etl-connect-kafka)

### 設定 Kafka
<a name="glue-streaming-etl-connect-kafka-configure"></a>

透過網際網路連線至 Kafka 串流沒有 AWS 先決條件。

您可以建立 AWS Glue Kafka 連線來管理您的連線憑證。如需詳細資訊，請參閱[為 Apache Kafka 資料串流建立 AWS Glue 連線](add-job-streaming.md#create-conn-streaming)。在您的 AWS Glue 任務組態中，提供 *connectionName* 作為**其他網路連線**，然後在方法呼叫中，提供 *connectionName* 給 `connectionName` 參數。

在某些情況下，您需要設定其他先決條件：
+ 如果搭配 IAM 身分驗證使用 Amazon Managed Streaming for Apache Kafka，您會需要適當的 IAM 組態。
+ 如果搭配 Amazon VPC 使用 Amazon Managed Streaming for Apache Kafka，您會需要適當的 Amazon VPC 組態。您需要建立提供 AWS Amazon VPC 連線資訊的 Glue 連線。您需要任務組態，才能將 AWS Glue 連線納入為**其他網路連線**。

如需有關串流 ETL 任務先決條件的詳細資訊，請參閱 [在 AWS Glue 中串流 ETL 任務](add-job-streaming.md)。

### 範例：從 Kafka 串流讀取
<a name="glue-streaming-etl-connect-kafka-read"></a>

搭配 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 使用。

Kafka 串流來源範例：

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "startingOffsets": "earliest", 
      "inferSchema": "true", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

### 範例：寫入 Kafka 串流
<a name="glue-streaming-etl-connect-kafka-write"></a>

寫入 Kafka 的範例：

使用 `getSink` 方法的範例：

```
data_frame_datasource0 = 
glueContext.getSink(
	connectionType="kafka",
	connectionOptions={
		JsonOptions("""{
			"connectionName": "ConfluentKafka", 
			"classification": "json", 
			"topic": "kafka-auth-topic", 
			"typeOfData": "kafka"}
		""")}, 
	transformationContext="dataframe_ApacheKafka_node1711729173428")
	.getDataFrame()
```

使用 `write_dynamic_frame.from_options` 方法的範例：

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

### Kafka 連線選項參考
<a name="glue-streaming-etl-connect-kafka"></a>

在讀取時，將下列連線選項與 `"connectionType": "kafka"` 搭配使用：
+ `"bootstrap.servers"` (必要) 自舉伺服器 URL 的清單，例如 `b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094`。此選項必須在 API 呼叫中指定，或在 Data Catalog 的資料表中繼資料中定義。
+ `"security.protocol"`(必要) 用來與代理程式通訊的協定。可能的值為 `"SSL"` 或 `"PLAINTEXT"`。
+ `"topicName"` (必要) 要訂閱的主題清單 (以逗號分隔)。您必須指定 `"topicName"`、`"assign"` 或 `"subscribePattern"` 其中一個。
+ `"assign"`：(必要) JSON 字串，指定要消耗的特定 `TopicPartitions`。您必須指定 `"topicName"`、`"assign"` 或 `"subscribePattern"` 其中一個。

  範例：'\$1"topicA":[0,1],"topicB":[2,4]\$1'
+ `"subscribePattern"`：(必要) 識別要訂閱的主題清單的 Java regex 字串。您必須指定 `"topicName"`、`"assign"` 或 `"subscribePattern"` 其中一個。

  範例：'topic.\$1'
+ `"classification"` (必要) 記錄中資料使用的檔案格式。除非透過資料目錄提供，否則為必要。
+ `"delimiter"` (選用) 當 `classification` 為 CSV 時使用的值分隔符號。預設值為 "`,`"。
+ `"startingOffsets"`：(選用) 要從中讀取資料的 Kafka 主題的起始位置。可能的值為 `"earliest"` 或 `"latest"`。預設值為 `"latest"`。
+ `"startingTimestamp"`：（選用，僅支援 AWS Glue 4.0 版或更新版本） Kafka 主題中要讀取資料的 記錄時間戳記。可能的值是 `yyyy-mm-ddTHH:MM:SSZ` 模式中 UTC 格式的時間戳記字串 (其中 `Z` 代表以 \$1/- 表示的 UTC 時區偏移。例如："2023-04-04T08:00:00-04:00")。

  注意：Glue 串流指令碼的連線選項清單中只能有其中一個 'startingOffsets' AWS 或 'startingTimestamp'，包括這兩個屬性都會導致任務失敗。
+ `"endingOffsets"`：(選用) 批次查詢結束時的終點。可能值為 `"latest"` 或指定每個 `TopicPartition` 結束偏移的 JSON 字串。

  對於 JSON 字串，格式為 `{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}`。值 `-1` 作為偏移代表 `"latest"`。
+ `"pollTimeoutMs"`：(選用) 在 Spark 任務執行器中從 Kafka 輪詢資料的逾時 (以毫秒為單位)。預設值為 `600000`。
+ `"numRetries"`：(選用) 擷取 Kafka 位移失敗之前，要重試的次數。預設值為 `3`。
+ `"retryIntervalMs"`：(選用) 重試擷取 Kafka 偏移量之前等待的時間 (毫秒)。預設值為 `10`。
+ `"maxOffsetsPerTrigger"`：(選用) 每個觸發間隔所處理之偏移數目上限的速率限制。指定的偏移總數會按比例跨 `topicPartitions` 或不同磁碟區而分割。預設值為 null，這表示消費者讀取所有偏移，直到已知的最新偏移。
+ `"minPartitions"`：(選用) 從 Kafka 讀取所需的分割區最小數量。預設值為 null，這表示 Spark 分割區的數量等於 Kafka 分割區的數量。
+  `"includeHeaders"`：(選用) 是否包含 Kafka 標頭。當選項設定為「true」時，資料輸出將包含一個名為「glue\$1streaming\$1kafka\$1headers」的額外欄，其類型為 `Array[Struct(key: String, value: String)]`。預設值為 "false"。此選項能在 AWS Glue 3.0 版或更新版中使用。
+ `"schema"`：(當 InferSchema 設定為 false 時為必要) 用於處理承載的架構。如果分類為 `avro`，提供的架構必須採用 Avro 架構格式。如果分類不是 `avro`，提供的架構必須採用 DDL 架構格式。

  以下是架構範例。

------
#### [ Example in DDL schema format ]

  ```
  'column1' INT, 'column2' STRING , 'column3' FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
  "type":"array",
  "items":
  {
  "type":"record",
  "name":"test",
  "fields":
  [
    {
      "name":"_id",
      "type":"string"
    },
    {
      "name":"index",
      "type":
      [
        "int",
        "string",
        "float"
      ]
    }
  ]
  }
  }
  ```

------
+ `"inferSchema"`：(選用) 預設值為 'false'。如果設為 'true'，將在執行時間時從 `foreachbatch` 承載偵測架構。
+ `"avroSchema"`：(已棄用) 使用 Avro 格式時，用於指定 Avro 資料架構的參數。此參數現已棄用。使用 `schema` 參數。
+ `"addRecordTimestamp"`︰(選用) 當此選項設定為 'true' 時，資料輸出將包含一個名為 "\$1\$1src\$1timestamp" 的額外資料欄，其指示主題收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
+ `"emitConsumerLagMetrics"`: (選用) 當該選項設定為 'true' 時，在介於主題收到最舊記錄與其在 AWS Glue 中到達 CloudWatch 的時間之間的持續時間，將會針對每個批次發出指標。指標的名稱為 "glue.driver.streaming.maxConsumerLagInMs"。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

在寫入時，將下列連線選項與 `"connectionType": "kafka"` 搭配使用：
+ `"connectionName"` （必要） AWS 用來連線至 Kafka 叢集的 Glue 連線名稱 （類似於 Kafka 來源）。
+ `"topic"` (必要) 如果主題資料欄存在，則在將指定的資料列寫入 Kafka 時會使用其值作為主題，除非已設定主題組態選項。也就是說，`topic` 組態選項會覆寫主題資料欄。
+ `"partition"` (選用) 如果指定有效的分區編號，則會在傳送記錄時使用該 `partition`。

  如果未指定分區，但 `key` 存在 ，則會使用金鑰的雜湊來選擇分區。

  如果既不存在 `key` 也不存在 `partition`，則當至少向分區產生 batch.size 個位元組時，將根據這些變更的黏性分區來選擇分區。
+ `"key"` (選用) 如果 `partition` 為 null，則用於分區。
+ `"classification"` (選用) 記錄中資料使用的檔案格式。我們僅支援 JSON、CSV 和 Avro。

  使用 Avro 格式，我們可以提供要序列化的自訂 avroSchema，但請注意，這也需要在來源上提供，以進行還原序列化。否則，預設會使用 Apache AvroSchema 進行序列化。

此外，您可以視需要更新 [Kafka 生產者組態參數](https://kafka.apache.org/documentation/#producerconfigs)來微調 Kafka 接收器。請注意，連線選項上沒有允許清單，所有金鑰值對都按原樣保留在接收器上。

但是，有一個不會生效的小型拒絕選項清單。如需詳細資訊，請參閱 [Kafka 特定的組態](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

## 使用 Kinesis 連線
<a name="glue-streaming-kinesis-connections"></a>

您可以使用 Kinesis 連線，透過儲存在 Data Catalog 資料表中的資訊讀取和寫入 Amazon Kinesis 資料串流，或透過提供資訊來直接存取資料串流。您可以從 Kinesis 讀取資訊到 Spark DataFrame，然後將其轉換為 AWS Glue DynamicFrame。您可以使用 JSON 格式將 DynamicFrames 寫入 Kinesis。如果您直接存取資料串流，則請使用這些選項來提供如何存取資料串流的相關資訊。

如果您使用 `getCatalogSource` 或 `create_data_frame_from_catalog` 來取用來自 Kinesis 串流來源的記錄，則該任務具有 Data Catalog 資料庫和資料表名稱資訊，並可以使用它來獲取一些從 Kinesis 串流來源讀取的基本參數。如果使用 `getSource`、`getSourceWithFormat`、`createDataFrameFromOptions` 或 `create_data_frame_from_options`，則您必須使用此處描述的連線選項來指定這些基本參數。

您可以使用 `GlueContext` 類別中指定方法的下列引數來指定 Kinesis 的連線選項。
+ Scala
  + `connectionOptions`：與 `getSource`、`createDataFrameFromOptions`、`getSink` 搭配使用 
  + `additionalOptions`：與 `getCatalogSource`、`getCatalogSink` 搭配使用。
  + `options`：與 `getSourceWithFormat`、`getSinkWithFormat` 搭配使用。
+ Python
  + `connection_options`：與 `create_data_frame_from_options`、`write_dynamic_frame_from_options` 搭配使用。
  + `additional_options`：與 `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` 搭配使用。
  + `options`：與 `getSource`、`getSink` 搭配使用。

如需有關串流 ETL 任務的注意事項和限制，請參閱 [串流 ETL 注意事項和限制](add-job-streaming.md#create-job-streaming-restrictions)。

### 設定 Kinesis
<a name="glue-streaming-etl-connect-kinesis-configure"></a>

若要連線到 Glue Spark 任務中的 Kinesis AWS 資料串流，您需要一些先決條件：
+ 如果讀取，Glue AWS 任務必須具有 Kinesis 資料串流的讀取存取層級 IAM 許可。
+ 如果寫入，Glue AWS 任務必須具有 Kinesis 資料串流的寫入存取層級 IAM 許可。

在某些情況下，您需要設定其他先決條件：
+ 如果您的 AWS Glue 任務設定了**其他網路連線** （通常連接到其他資料集），且其中一個連線提供 Amazon VPC **網路選項**，這將引導您的任務透過 Amazon VPC 進行通訊。在這種情況下，您還需要將 Kinesis 資料串流設定為透過 Amazon VPC 進行通訊。為此，您可以建立 Amazon VPC 與 Kinesis 資料串流之間的介面 VPC 端點。如需詳細資訊，請參閱 [Using Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com//streams/latest/dev/vpc.html)。
+ 在另一個帳戶中指定 Amazon Kinesis Data Streams 時，您必須設定角色和政策以允許跨帳戶存取。如需詳細資訊，請參閱[範例：從不同帳戶中的 Kinesis 串流讀取](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html)。

如需有關串流 ETL 任務先決條件的詳細資訊，請參閱 [在 AWS Glue 中串流 ETL 任務](add-job-streaming.md)。

### 從 Kinesis 讀取
<a name="glue-streaming-etl-connect-kinesis-read"></a>

#### 範例：從 Kinesis 串流讀取
<a name="section-etl-connect-kinesis-read"></a>

搭配 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 使用。

Amazon Kinesis 串流來源範例：

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

### 寫入 Kinesis
<a name="glue-streaming-etl-connect-kinesis-write"></a>

#### 範例：寫入 Kinesis 串流
<a name="section-etl-connect-kinesis-write"></a>

搭配 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 使用。您的 DynamicFrame 將以 JSON 格式寫入資料流。如果任務在數次重試後仍無法寫入，便會失敗。依預設，每個 DynamicFrame 記錄都會單獨傳送至 Kinesis 串流。您可以使用 `aggregationEnabled` 和關聯的參數來設定此行為。

從串流任務寫入 Amazon Kinesis 的範例：

------
#### [ Python ]

```
glueContext.write_dynamic_frame.from_options(
    frame=frameToWrite
    connection_type="kinesis",
    connection_options={
        "partitionKey": "part1",
        "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName",
    }
)
```

------
#### [ Scala ]

```
glueContext.getSinkWithFormat(
                connectionType="kinesis",
                options=JsonOptions("""{
                    "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName",
                    "partitionKey": "part1"
                }"""), 
           )
           .writeDynamicFrame(frameToWrite)
```

------

### Kinesis 連線參數
<a name="glue-streaming-etl-connect-kinesis-connect"></a>

指定 Amazon Kinesis Data Streams 的連線選項。

針對 Kinesis 串流資料來源使用下列的連線選項：
+ `"streamARN"` (必要) 用於讀取/寫入。Kinesis 資料串流的 ARN。
+ `"classification"` (讀取時為必要) 用於讀取。記錄中資料使用的檔案格式。除非透過資料目錄提供，否則為必要。
+ `"streamName"` – (選用) 用於讀取。要從中讀取的 Kinesis 資料串流名稱。與 `endpointUrl` 搭配使用。
+ `"endpointUrl"` – (選用) 用於讀取。預設："https://kinesis.us-east-1.amazonaws.com"。Kinesis 串流的 AWS 端點。除非您要連線到特殊區域，否則無需變更此設定。
+ `"partitionKey"` – (選用) 用於寫入。在產生記錄時使用的 Kinesis 分割區索引鍵。
+ `"delimiter"` (選用) 用於讀取。當 `classification` 為 CSV 時使用的值分隔符號。預設值為 "`,`"。
+ `"startingPosition"`：(選用) 用於讀取。Kinesis 資料串流中要從中讀取資料的起始位置。可能的值包括 `"latest"`、`"trim_horizon"`、`"earliest"` 或 `yyyy-mm-ddTHH:MM:SSZ` 模式中 UTC 格式的時間戳記字串 (其中 `Z` 代表以 \$1/- 表示的 UTC 時區偏移。例如："2023-04-04T08:00:00-04:00")。預設值為 `"latest"`。注意：僅 AWS Glue 4.0 版或更新版本`"startingPosition"`支援 UTC 格式的時間戳記字串。
+ `"failOnDataLoss"`：(選用) 如果有任何作用中的碎片遺失或過期，則任務失敗。預設值為 `"false"`。
+ `"awsSTSRoleARN"`：(選用) 用於讀取/寫入。要使用 () 擔任的角色的 Amazon Resource Name AWS Security Token Service (ARN AWS STS)。此角色必須具有描述或讀取 Kinesis 資料串流記錄操作的許可。存取不同帳戶中的資料串流時，您必須使用此參數。搭配 `"awsSTSSessionName"` 使用。
+ `"awsSTSSessionName"`：(選用) 用於讀取/寫入。使用 AWS STS擔任角色之工作階段的識別符。存取不同帳戶中的資料串流時，您必須使用此參數。搭配 `"awsSTSRoleARN"` 使用。
+ `"awsSTSEndpoint"`：（選用） 使用 擔任的角色連線至 Kinesis 時要使用的 AWS STS 端點。這允許在 VPC 中使用區域 AWS STS 端點，這是預設全域端點無法做到的。
+ `"maxFetchTimeInMs"`：(選用) 用於讀取。任務執行器從 Kinesis 資料串流讀取目前批次記錄所花費的最長時間，以毫秒 (ms) 為單位指定。在此期間可以進行多次 `GetRecords` API 呼叫。預設值為 `1000`。
+ `"maxFetchRecordsPerShard"`：(選用) 用於讀取。每個微批次的 Kinesis 資料串流中每個碎片要擷取的記錄數目上限。注意：如果串流任務已從 Kinesis 讀取額外的記錄 (在相同的 get-records 呼叫中)，用戶端可以超過此限制。如果 `maxFetchRecordsPerShard` 需要嚴格控制，則其必須是 `maxRecordPerRead` 的倍數。預設值為 `100000`。
+ `"maxRecordPerRead"`：(選用) 用於讀取。要從每個 `getRecords` 操作的 Kinesis 資料串流中擷取的記錄數量上限。預設值為 `10000`。
+ `"addIdleTimeBetweenReads"`：(選用) 用於讀取。增加兩個連續 `getRecords` 操作之間的時間延遲。預設值為 `"False"`。此選項僅在 Glue 2.0 及更高版本上才可設定。
+ `"idleTimeBetweenReadsInMs"`：(選用) 用於讀取。兩個連續 `getRecords` 操作的最小延遲時間，以毫秒為單位指定。預設值為 `1000`。此選項僅在 Glue 2.0 及更高版本上才可設定。
+ `"describeShardInterval"`：(選用) 用於讀取。指令碼考慮重新分片的兩個 `ListShards` API 呼叫之間的最小時間間隔。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[重新分片的策略](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html)。預設值為 `1s`。
+ `"numRetries"`：(選用) 用於讀取。Kinesis Data Streams API 請求的重試數上限。預設值為 `3`。
+ `"retryIntervalMs"`：(選用) 用於讀取。重試 Kinesis Data Streams API 呼叫之前的冷卻時間期間 (以毫秒為單位)。預設值為 `1000`。
+ `"maxRetryIntervalMs"`：(選用) 用於讀取。Kinesis Data Streams API 呼叫之兩次重試之間的最大冷卻時間期間 (以毫秒為單位)。預設值為 `10000`。
+ `"avoidEmptyBatches"`：(選用) 用於讀取。避免建立空白微批次任務，方法是在批次開始之前檢查 Kinesis 資料串流中是否有未讀取的資料。預設值為 `"False"`。
+ `"schema"`：(在 inferSchema 設定為 false 時為必要) 用於讀取。用於處理承載的結構描述。如果分類為 `avro`，提供的架構必須採用 Avro 架構格式。如果分類不是 `avro`，提供的架構必須採用 DDL 架構格式。

  以下是架構範例。

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`：(選用) 用於讀取。預設值為 'false'。如果設為 'true'，將在執行時間時從 `foreachbatch` 承載偵測架構。
+ `"avroSchema"`：(已棄用) 用於讀取。使用 Avro 格式時，用於指定 Avro 資料架構的參數。此參數現已棄用。使用 `schema` 參數。
+ `"addRecordTimestamp"`：(選用) 用於讀取。當此選項設定為 'true' 時，資料輸出將包含一個名為 "\$1\$1src\$1timestamp" 的額外資料欄，其指示串流收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
+ `"emitConsumerLagMetrics"`：(選用) 用於讀取。當選項設定為 "true" 時，在介於串流收到最舊記錄與其在 AWS Glue 中到達 CloudWatch 的時間之間的持續時間內，將會針對每個批次發出指標。指標的名稱為 "glue.driver.streaming.maxConsumerLagInMs"。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
+ `"fanoutConsumerARN"`：(選用) 用於讀取。Kinesis 串流取用者的 ARN，適用於 `streamARN` 中指定的串流。用於啟用 Kinesis 連線的強化廣發功能模式。如需有關使用強化廣發功能使用 Kinesis 串流的詳細資訊，請參閱 [在 Kinesis 串流任務中使用強化廣發功能](aws-glue-programming-etl-connect-kinesis-efo.md)。
+ `"recordMaxBufferedTime"` – (選用) 用於寫入。預設：1000 (毫秒)。在等待寫入時，記錄受到緩衝的最長時間。
+ `"aggregationEnabled"` – (選用) 用於寫入。預設：true。指定是否應在將記錄傳送至 Kinesis 前先彙整記錄。
+ `"aggregationMaxSize"` – (選用) 用於寫入。預設：51200 (位元組)。若記錄大於此限制，則其會略過彙整工具。請注意，Kinesis 會強制執行 50 KB 的記錄大小限制。若您將此值設定為超過 50 KB，Kinesis 將會拒絕過大的記錄。
+ `"aggregationMaxCount"` – (選用) 用於寫入。預設：4294967295。要匯入彙整記錄的項目數量上限。
+ `"producerRateLimit"` – (選用) 用於寫入。預設：150 (%)。作為後端限制的百分比來限制從單一生產者 (例如您的任務) 傳送的每個碎片輸送量。
+ `"collectionMaxCount"` – (選用) 用於寫入。預設：500。要匯入 PutRecords 請求的項目數量上限。
+ `"collectionMaxSize"` – (選用) 用於寫入。預設：5242880 (位元組)。使用 PutRecords 請求傳送的最大資料量。