

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Kafka 連線
<a name="aws-glue-programming-etl-connect-kafka-home"></a>

您可以使用 Kafka 連線來讀取和寫入 Kafka 資料串流，方法是使用儲存在 Data Catalog 資料表中的資訊，或提供資訊以直接存取資料串流。連線支援 Kafka 叢集或 Amazon Managed Streaming for Apache Kafka 叢集。您可以從 Kafka 讀取資訊到 Spark DataFrame，然後將其轉換為 AWS Glue DynamicFrame。您可以使用 JSON 格式將 DynamicFrames 寫入 Kafka。如果您直接存取資料串流，則請使用這些選項來提供如何存取資料串流的相關資訊。

如果您使用 `getCatalogSource` 或 `create_data_frame_from_catalog` 來取用來自 Kinesis 串流來源的記錄，或者使用 `getCatalogSink` 或 `write_dynamic_frame_from_catalog` 將記錄寫入 Kafka，並且則該任務具有 Data Catalog 資料庫和資料表名稱資訊，並可以使用其來取得一些從 Kafka 串流來源讀取的基本參數。如果使用 `getSource`、`getCatalogSink`、`getSourceWithFormat`、`getSinkWithFormat`、`createDataFrameFromOptions`、`create_data_frame_from_options` 或 `write_dynamic_frame_from_catalog`，您必須使用此處描述的連線選項來指定這些基本參數。

您可以使用 `GlueContext` 類別中指定方法的下列引數來指定 Kafka 的連線選項。
+ Scala
  + `connectionOptions`：與 `getSource`、`createDataFrameFromOptions`、`getSink` 搭配使用 
  + `additionalOptions`：與 `getCatalogSource`、`getCatalogSink` 搭配使用。
  + `options`：與 `getSourceWithFormat`、`getSinkWithFormat` 搭配使用。
+ Python
  + `connection_options`：與 `create_data_frame_from_options`、`write_dynamic_frame_from_options` 搭配使用。
  + `additional_options`：與 `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` 搭配使用。
  + `options`：與 `getSource`、`getSink` 搭配使用。

如需有關串流 ETL 任務的注意事項和限制，請參閱 [串流 ETL 注意事項和限制](add-job-streaming.md#create-job-streaming-restrictions)。

**Topics**
+ [設定 Kafka](#aws-glue-programming-etl-connect-kafka-configure)
+ [範例：從 Kafka 串流讀取](#aws-glue-programming-etl-connect-kafka-read)
+ [範例：寫入 Kafka 串流](#aws-glue-programming-etl-connect-kafka-write)
+ [Kafka 連線選項參考](#aws-glue-programming-etl-connect-kafka)

## 設定 Kafka
<a name="aws-glue-programming-etl-connect-kafka-configure"></a>

透過網際網路連線至 Kafka 串流沒有 AWS 先決條件。

您可以建立 AWS Glue Kafka 連線來管理您的連線憑證。如需詳細資訊，請參閱[為 Apache Kafka 資料串流建立 AWS Glue 連線](add-job-streaming.md#create-conn-streaming)。在您的 AWS Glue 任務組態中，提供 *connectionName* 作為**其他網路連線**，然後在方法呼叫中，提供 *connectionName* 給 `connectionName` 參數。

在某些情況下，您需要設定其他先決條件：
+ 如果搭配 IAM 身分驗證使用 Amazon Managed Streaming for Apache Kafka，您會需要適當的 IAM 組態。
+ 如果搭配 Amazon VPC 使用 Amazon Managed Streaming for Apache Kafka，您會需要適當的 Amazon VPC 組態。您需要建立提供 AWS Amazon VPC 連線資訊的 Glue 連線。您需要任務組態，才能將 AWS Glue 連線納入為**其他網路連線**。

如需有關串流 ETL 任務先決條件的詳細資訊，請參閱 [在 AWS Glue 中串流 ETL 任務](add-job-streaming.md)。

## 範例：從 Kafka 串流讀取
<a name="aws-glue-programming-etl-connect-kafka-read"></a>

搭配 [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) 使用。

Kafka 串流來源範例：

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "startingOffsets": "earliest", 
      "inferSchema": "true", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

## 範例：寫入 Kafka 串流
<a name="aws-glue-programming-etl-connect-kafka-write"></a>

寫入 Kafka 的範例：

使用 `getSink` 方法的範例：

```
data_frame_datasource0 = 
glueContext.getSink(
	connectionType="kafka",
	connectionOptions={
		JsonOptions("""{
			"connectionName": "ConfluentKafka", 
			"classification": "json", 
			"topic": "kafka-auth-topic", 
			"typeOfData": "kafka"}
		""")}, 
	transformationContext="dataframe_ApacheKafka_node1711729173428")
	.getDataFrame()
```

使用 `write_dynamic_frame.from_options` 方法的範例：

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

## Kafka 連線選項參考
<a name="aws-glue-programming-etl-connect-kafka"></a>

在讀取時，將下列連線選項與 `"connectionType": "kafka"` 搭配使用：
+ `"bootstrap.servers"` (必要) 自舉伺服器 URL 的清單，例如 `b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094`。此選項必須在 API 呼叫中指定，或在 Data Catalog 的資料表中繼資料中定義。
+ `"security.protocol"`(必要) 用來與代理程式通訊的協定。可能的值為 `"SSL"` 或 `"PLAINTEXT"`。
+ `"topicName"` (必要) 要訂閱的主題清單 (以逗號分隔)。您必須指定 `"topicName"`、`"assign"` 或 `"subscribePattern"` 其中一個。
+ `"assign"`：(必要) JSON 字串，指定要消耗的特定 `TopicPartitions`。您必須指定 `"topicName"`、`"assign"` 或 `"subscribePattern"` 其中一個。

  範例：'\$1"topicA":[0,1],"topicB":[2,4]\$1'
+ `"subscribePattern"`：(必要) 識別要訂閱的主題清單的 Java regex 字串。您必須指定 `"topicName"`、`"assign"` 或 `"subscribePattern"` 其中一個。

  範例：'topic.\$1'
+ `"classification"` (必要) 記錄中資料使用的檔案格式。除非透過資料目錄提供，否則為必要。
+ `"delimiter"` (選用) 當 `classification` 為 CSV 時使用的值分隔符號。預設值為 "`,`"。
+ `"startingOffsets"`：(選用) 要從中讀取資料的 Kafka 主題的起始位置。可能的值為 `"earliest"` 或 `"latest"`。預設值為 `"latest"`。
+ `"startingTimestamp"`：（選用，僅支援 AWS Glue 4.0 版或更新版本） Kafka 主題中要讀取資料的 記錄時間戳記。可能的值是 `yyyy-mm-ddTHH:MM:SSZ` 模式中 UTC 格式的時間戳記字串 (其中 `Z` 代表以 \$1/- 表示的 UTC 時區偏移。例如："2023-04-04T08:00:00-04:00")。

  注意：Glue 串流指令碼的連線選項清單中只能有其中一個 'startingOffsets' AWS 或 'startingTimestamp'，包括這兩個屬性都會導致任務失敗。
+ `"endingOffsets"`：(選用) 批次查詢結束時的終點。可能值為 `"latest"` 或指定每個 `TopicPartition` 結束偏移的 JSON 字串。

  對於 JSON 字串，格式為 `{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}`。值 `-1` 作為偏移代表 `"latest"`。
+ `"pollTimeoutMs"`：(選用) 在 Spark 任務執行器中從 Kafka 輪詢資料的逾時 (以毫秒為單位)。預設值為 `600000`。
+ `"numRetries"`：(選用) 擷取 Kafka 位移失敗之前，要重試的次數。預設值為 `3`。
+ `"retryIntervalMs"`：(選用) 重試擷取 Kafka 偏移量之前等待的時間 (毫秒)。預設值為 `10`。
+ `"maxOffsetsPerTrigger"`：(選用) 每個觸發間隔所處理之偏移數目上限的速率限制。指定的偏移總數會按比例跨 `topicPartitions` 或不同磁碟區而分割。預設值為 null，這表示消費者讀取所有偏移，直到已知的最新偏移。
+ `"minPartitions"`：(選用) 從 Kafka 讀取所需的分割區最小數量。預設值為 null，這表示 Spark 分割區的數量等於 Kafka 分割區的數量。
+  `"includeHeaders"`：(選用) 是否包含 Kafka 標頭。當選項設定為「true」時，資料輸出將包含一個名為「glue\$1streaming\$1kafka\$1headers」的額外欄，其類型為 `Array[Struct(key: String, value: String)]`。預設值為 "false"。此選項能在 AWS Glue 3.0 版或更新版中使用。
+ `"schema"`：(當 InferSchema 設定為 false 時為必要) 用於處理承載的架構。如果分類為 `avro`，提供的架構必須採用 Avro 架構格式。如果分類不是 `avro`，提供的架構必須採用 DDL 架構格式。

  以下是架構範例。

------
#### [ Example in DDL schema format ]

  ```
  'column1' INT, 'column2' STRING , 'column3' FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
  "type":"array",
  "items":
  {
  "type":"record",
  "name":"test",
  "fields":
  [
    {
      "name":"_id",
      "type":"string"
    },
    {
      "name":"index",
      "type":
      [
        "int",
        "string",
        "float"
      ]
    }
  ]
  }
  }
  ```

------
+ `"inferSchema"`：(選用) 預設值為 'false'。如果設為 'true'，將在執行時間時從 `foreachbatch` 承載偵測架構。
+ `"avroSchema"`：(已棄用) 使用 Avro 格式時，用於指定 Avro 資料架構的參數。此參數現已棄用。使用 `schema` 參數。
+ `"addRecordTimestamp"`︰(選用) 當此選項設定為 'true' 時，資料輸出將包含一個名為 "\$1\$1src\$1timestamp" 的額外資料欄，其指示主題收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
+ `"emitConsumerLagMetrics"`: (選用) 當該選項設定為 'true' 時，在介於主題收到最舊記錄與其在 AWS Glue 中到達 CloudWatch 的時間之間的持續時間，將會針對每個批次發出指標。指標的名稱為 "glue.driver.streaming.maxConsumerLagInMs"。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

在寫入時，將下列連線選項與 `"connectionType": "kafka"` 搭配使用：
+ `"connectionName"` （必要） AWS 用來連線至 Kafka 叢集的 Glue 連線名稱 （類似於 Kafka 來源）。
+ `"topic"` (必要) 如果主題資料欄存在，則在將指定的資料列寫入 Kafka 時會使用其值作為主題，除非已設定主題組態選項。也就是說，`topic` 組態選項會覆寫主題資料欄。
+ `"partition"` (選用) 如果指定有效的分區編號，則會在傳送記錄時使用該 `partition`。

  如果未指定分區，但 `key` 存在 ，則會使用金鑰的雜湊來選擇分區。

  如果既不存在 `key` 也不存在 `partition`，則當至少向分區產生 batch.size 個位元組時，將根據這些變更的黏性分區來選擇分區。
+ `"key"` (選用) 如果 `partition` 為 null，則用於分區。
+ `"classification"` (選用) 記錄中資料使用的檔案格式。我們僅支援 JSON、CSV 和 Avro。

  使用 Avro 格式，我們可以提供要序列化的自訂 avroSchema，但請注意，這也需要在來源上提供，以進行還原序列化。否則，預設會使用 Apache AvroSchema 進行序列化。

此外，您可以視需要更新 [Kafka 生產者組態參數](https://kafka.apache.org/documentation/#producerconfigs)來微調 Kafka 接收器。請注意，連線選項上沒有允許清單，所有金鑰值對都按原樣保留在接收器上。

但是，有一個不會生效的小型拒絕選項清單。如需詳細資訊，請參閱 [Kafka 特定的組態](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。