

# Kafka 接続
<a name="aws-glue-programming-etl-connect-kafka-home"></a>

Kafka 接続を使用すると、Data Catalog テーブルに格納されている情報を使用するか、データストリームに直接アクセスするための情報を指定することにより、Kafka データストリームへ読み込むまたは書き込むことができます。接続は、Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka クラスターをサポートします。Kafka から Spark DataFrame に情報を読み込み、AWS Glue DynamicFrame に変換できます。DynamicFrames を JSON 形式で Kafka に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

`getCatalogSource` または `create_data_frame_from_catalog` を使用して Kafka ストリーミングソースからレコードを消費するか、`getCatalogSink` または `write_dynamic_frame_from_catalog` を使用して Kafka にレコードを書き込み、ジョブに Data Catalog データベースおよびテーブル名の情報があり、それを使用して Kafka ストリーミングソースから読み込むためのいくつかの基本パラメータを取得できる場合。`getSource`、`getCatalogSink`、`getSourceWithFormat`、`getSinkWithFormat`、`createDataFrameFromOptions`、`create_data_frame_from_options`、`write_dynamic_frame_from_catalog` を使用する場合、ここで説明する接続オプションを使用し、これらの基本パラメータを指定する必要があります。

`GlueContext` クラスで指定されたメソッドの次の引数を使用し、Kafka の接続オプションを指定できます。
+ Scala
  + `connectionOptions`: `getSource`、`createDataFrameFromOptions`、`getSink` で使用 
  + `additionalOptions`: `getCatalogSource`、`getCatalogSink` で使用
  + `options`: `getSourceWithFormat`、`getSinkWithFormat` で使用
+ Python
  + `connection_options`: `create_data_frame_from_options`、`write_dynamic_frame_from_options` で使用
  + `additional_options`: `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` で使用
  + `options`: `getSource`、`getSink` で使用

ストリーミング ETL ジョブに関する注意事項と制限事項については、「[ストリーミング ETL に関する注意と制限](add-job-streaming.md#create-job-streaming-restrictions)」を参照してください。

**Topics**
+ [Kafka の設定](#aws-glue-programming-etl-connect-kafka-configure)
+ [例: Kafka ストリームからの読み込み](#aws-glue-programming-etl-connect-kafka-read)
+ [例: Kafka ストリームへの書き込み](#aws-glue-programming-etl-connect-kafka-write)
+ [Kafka 接続オプションのリファレンス](#aws-glue-programming-etl-connect-kafka)

## Kafka の設定
<a name="aws-glue-programming-etl-connect-kafka-configure"></a>

インターネット経由で利用可能な Kafka ストリームに接続するための AWS の前提条件はありません。

AWS Glue Kafka 接続を作成して、接続認証情報を管理できます。詳細については、「[Apache Kafka データストリームの AWS Glue 接続の作成](add-job-streaming.md#create-conn-streaming)」を参照してください。AWS Glue ジョブ設定で、*connectionName* を**追加ネットワーク接続**として指定し、メソッド呼び出しで `connectionName` パラメータに *connectionName* を指定します。

場合によっては、追加の前提条件を設定する必要があります。
+ IAM 認証で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な IAM 設定が必要になります。
+ Amazon VPC で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な Amazon VPC 設定が必要になります。Amazon VPC 接続情報を提供する AWS Glue 接続を作成する必要があります。AWS Glue 接続を**追加ネットワーク接続**として含めるには、ジョブ設定が必要です。

ストリーミング ETL ジョブの前提条件の詳細については、「[AWS Glue でのストリーミング ETL ジョブ](add-job-streaming.md)」を参照してください。

## 例: Kafka ストリームからの読み込み
<a name="aws-glue-programming-etl-connect-kafka-read"></a>

[forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) と組み合わせて使用します。

Kafka ストリーミングソースの例：

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "startingOffsets": "earliest", 
      "inferSchema": "true", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

## 例: Kafka ストリームへの書き込み
<a name="aws-glue-programming-etl-connect-kafka-write"></a>

Kafka への書き込み例:

`getSink` メソッドを使った例:

```
data_frame_datasource0 = 
glueContext.getSink(
	connectionType="kafka",
	connectionOptions={
		JsonOptions("""{
			"connectionName": "ConfluentKafka", 
			"classification": "json", 
			"topic": "kafka-auth-topic", 
			"typeOfData": "kafka"}
		""")}, 
	transformationContext="dataframe_ApacheKafka_node1711729173428")
	.getDataFrame()
```

`write_dynamic_frame.from_options` メソッドを使った例:

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

## Kafka 接続オプションのリファレンス
<a name="aws-glue-programming-etl-connect-kafka"></a>

読み込むとき、`"connectionType": "kafka"` に次の接続オプションを使用します。
+ `"bootstrap.servers"` (必須) ブートストラップサーバーの URL のリスト (例: `b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094`)。このオプションは API 呼び出しで指定するか、データカタログ内のテーブルメタデータで定義する必要があります。
+ `"security.protocol"` (必須) ブローカーと通信するために使用されるプロトコル。使用できる値は、`"SSL"` または `"PLAINTEXT"` です。
+ `"topicName"` (必須) サブスクライブするトピックのカンマ区切りリスト。`"topicName"`、`"assign"`、または `"subscribePattern"` の中から、いずれか 1 つのみを指定する必要があります。
+ `"assign"`: (必須) 消費する特定の `TopicPartitions` を指定する JSON 文字列。`"topicName"`、`"assign"`、または `"subscribePattern"` の中から、いずれか 1 つのみを指定する必要があります。

  例: '\$1"topicA":[0,1],"topicB":[2,4]\$1'
+ `"subscribePattern"`: (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。`"topicName"`、`"assign"`、または `"subscribePattern"` の中から、いずれか 1 つのみを指定する必要があります。

  例: 'topic.\$1'
+ `"classification"` (必須) レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。
+ `"delimiter"` (オプション) `classification` が CSV の場合に使用される値の区切り文字。デフォルトは「`,`」です。
+ `"startingOffsets"`: (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、`"earliest"` または `"latest"` です。デフォルト値は `"latest"` です。
+ `"startingTimestamp"`: (オプション、AWS Glue バージョン 4.0 以降でのみサポート) Kafka トピック内で、データの読み込みを開始するレコードのタイムスタンプ。指定できる値は UTC 形式 (`yyyy-mm-ddTHH:MM:SSZ` のパターン) のタイムスタンプ文字列です (`Z` は UTC タイムゾーンのオフセットを \$1/- で表します。例: 「2023-04-04T08:00:00-04:00」)。

  注: AWS Glue ストリーミングスクリプトの接続オプションリストには、「startingOffsets」または「startingTimestamp」のいずれかしか表示できません。これらのプロパティの両方を含めると、ジョブが失敗します。
+ `"endingOffsets"`: (オプション) バッチクエリの終了位置。設定が可能な値は、`"latest"` または、各 `TopicPartition` の終了オフセットを指定する JSON 文字列のいずれかです。

  JSON 文字列の場合、`{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}` の形式を使用します。オフセットとして値 `-1` を設定する場合、`"latest"` の意味になります。
+ `"pollTimeoutMs"`: (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は `600000` です。
+ `"numRetries"`: (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は `3` です。
+ `"retryIntervalMs"`: (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は `10` です。
+ `"maxOffsetsPerTrigger"`: (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームの `topicPartitions` 間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。
+ `"minPartitions"`: (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。
+  `"includeHeaders"`: (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、「glue\$1streaming\$1kafka\$1headers」という名前で `Array[Struct(key: String, value: String)]` 型の列が追加されます。デフォルト値は『false』です。このオプションは、AWS Glue バージョン 3.0 以降でのみ使用可能です。
+ `"schema"`: (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類が `avro` である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が `avro` 以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。

  以下に、スキーマの例を示します。

------
#### [ Example in DDL schema format ]

  ```
  'column1' INT, 'column2' STRING , 'column3' FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
  "type":"array",
  "items":
  {
  "type":"record",
  "name":"test",
  "fields":
  [
    {
      "name":"_id",
      "type":"string"
    },
    {
      "name":"index",
      "type":
      [
        "int",
        "string",
        "float"
      ]
    }
  ]
  }
  }
  ```

------
+ `"inferSchema"`: (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマが `foreachbatch` 内のペイロードから検出されます。
+ `"avroSchema"`: (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。`schema` パラメータを使用します。
+ `"addRecordTimestamp"`: (オプション) このオプションを「true」に設定すると、トピックが対応するレコードを受信した時刻を表示する「\$1\$1src\$1timestamp」という列が、データ出力に追加で表示されます。デフォルト値は、「false」です。このオプションは AWS Glue のバージョン 4.0 以降でサポートされています。
+ `"emitConsumerLagMetrics"`: (オプション) このオプションを「true」に設定すると、バッチごとに、トピックが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との、間隔のメトリクスが出力されます。このメトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

書き込むとき、`"connectionType": "kafka"` に次の接続オプションを使用します。
+ `"connectionName"` (必須) Kafka クラスターへの接続に使用される AWS Glue 接続の名前 (Kafka ソースと同様)。
+ `"topic"` (必須) トピック列が存在する場合、トピック設定オプションが設定されていない限り、指定された行を Kafka に書き込む際にトピック列の値がトピックとして使用されます。つまり、`topic` 設定オプションはトピック列をオーバーライドします。
+ `"partition"` (オプション) 有効なパーティション番号が指定された場合、レコードの送信時にその `partition` が使用されます。

  パーティションが指定されずに `key` が存在する場合、キーのハッシュを使用してパーティションが選択されます。

  `key` と `partition` のどちらも存在しない場合、そのパーティションに少なくとも batch.size バイトが生成された際に変更内容のスティッキーパーティション分割に基づいてパーティションが選択されます。
+ `"key"` (オプション) `partition` が null の場合、パーティション分割に使用されます。
+ `"classification"` (オプション) レコードのデータに使用されるファイル形式。JSON、CSV、Avro のみをサポートしています。

  Avro 形式を使用すると、シリアル化するためにカスタムの AvroSchema を提供できますが、シリアル化解除する場合にもソースに提供する必要があることに注意してください。それ以外の場合、デフォルトで Apache AvroSchema を使用してシリアル化します。

さらに、[Kafka プロデューサーの設定パラメーター](https://kafka.apache.org/documentation/#producerconfigs)を更新することにより、必要に応じて Kafka シンクを微調整できます。接続オプションには許可リストがないことに注意してください。すべてのキー値のペアはそのままシンクに保持されます。

ただし、有効にならないオプションの小さな拒否リストがあります。詳細については、「[Apache 固有の設定](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)」を参照してください。