View a markdown version of this page

Kafka 接続 - AWS Glue

Kafka 接続

Kafka 接続を使用すると、Data Catalog テーブルに格納されている情報を使用するか、データストリームに直接アクセスするための情報を指定することにより、Kafka データストリームへ読み込むまたは書き込むことができます。接続は、Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka クラスターをサポートします。Kafka から Spark DataFrame に情報を読み込み、AWS Glue DynamicFrame に変換できます。DynamicFrames を JSON 形式で Kafka に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

getCatalogSource または create_data_frame_from_catalog を使用して Kafka ストリーミングソースからレコードを消費するか、getCatalogSink または write_dynamic_frame_from_catalog を使用して Kafka にレコードを書き込み、ジョブに Data Catalog データベースおよびテーブル名の情報があり、それを使用して Kafka ストリーミングソースから読み込むためのいくつかの基本パラメータを取得できる場合。getSourcegetCatalogSinkgetSourceWithFormatgetSinkWithFormatcreateDataFrameFromOptionscreate_data_frame_from_optionswrite_dynamic_frame_from_catalog を使用する場合、ここで説明する接続オプションを使用し、これらの基本パラメータを指定する必要があります。

GlueContext クラスで指定されたメソッドの次の引数を使用し、Kafka の接続オプションを指定できます。

  • Scala

    • connectionOptions: getSourcecreateDataFrameFromOptionsgetSink で使用

    • additionalOptions: getCatalogSourcegetCatalogSink で使用

    • options: getSourceWithFormatgetSinkWithFormat で使用

  • Python

    • connection_options: create_data_frame_from_optionswrite_dynamic_frame_from_options で使用

    • additional_options: create_data_frame_from_catalogwrite_dynamic_frame_from_catalog で使用

    • options: getSourcegetSink で使用

ストリーミング ETL ジョブに関する注意事項と制限事項については、「ストリーミング ETL に関する注意と制限」を参照してください。

Kafka の設定

インターネット経由で利用可能な Kafka ストリームに接続するための AWS の前提条件はありません。

AWS Glue Kafka 接続を作成して、接続認証情報を管理できます。詳細については、「Apache Kafka データストリームの AWS Glue 接続の作成」を参照してください。AWS Glue ジョブ設定で、connectionName追加ネットワーク接続として指定し、メソッド呼び出しで connectionName パラメータに connectionName を指定します。

場合によっては、追加の前提条件を設定する必要があります。

  • IAM 認証で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な IAM 設定が必要になります。

  • Amazon VPC で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な Amazon VPC 設定が必要になります。Amazon VPC 接続情報を提供する AWS Glue 接続を作成する必要があります。AWS Glue 接続を追加ネットワーク接続として含めるには、ジョブ設定が必要です。

ストリーミング ETL ジョブの前提条件の詳細については、「AWS Glue でのストリーミング ETL ジョブ」を参照してください。

例: Kafka ストリームからの読み込み

forEachBatch と組み合わせて使用します。

Kafka ストリーミングソースの例:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

例: Kafka ストリームへの書き込み

Kafka への書き込み例:

getSink メソッドを使った例:

data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

write_dynamic_frame.from_options メソッドを使った例:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

Kafka 接続オプションのリファレンス

読み込むとき、"connectionType": "kafka" に次の接続オプションを使用します。

  • "bootstrap.servers" (必須) ブートストラップサーバーの URL のリスト (例: b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094)。このオプションは API 呼び出しで指定するか、データカタログ内のテーブルメタデータで定義する必要があります。

  • "security.protocol" (必須) ブローカーと通信するために使用されるプロトコル。使用できる値は、"SSL" または "PLAINTEXT" です。

  • "topicName" (必須) サブスクライブするトピックのカンマ区切りリスト。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

  • "assign": (必須) 消費する特定の TopicPartitions を指定する JSON 文字列。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

    例: '{"topicA":[0,1],"topicB":[2,4]}'

  • "subscribePattern": (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

    例: 'topic.*'

  • "classification" (必須) レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。

  • "delimiter" (オプション) classification が CSV の場合に使用される値の区切り文字。デフォルトは「,」です。

  • "startingOffsets": (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、"earliest" または "latest" です。デフォルト値は "latest" です。

  • "startingTimestamp": (オプション、AWS Glue バージョン 4.0 以降でのみサポート) Kafka トピック内で、データの読み込みを開始するレコードのタイムスタンプ。指定できる値は UTC 形式 (yyyy-mm-ddTHH:MM:SSZ のパターン) のタイムスタンプ文字列です (Z は UTC タイムゾーンのオフセットを +/- で表します。例: 「2023-04-04T08:00:00-04:00」)。

    注: AWS Glue ストリーミングスクリプトの接続オプションリストには、「startingOffsets」または「startingTimestamp」のいずれかしか表示できません。これらのプロパティの両方を含めると、ジョブが失敗します。

  • "endingOffsets": (オプション) バッチクエリの終了位置。設定が可能な値は、"latest" または、各 TopicPartition の終了オフセットを指定する JSON 文字列のいずれかです。

    JSON 文字列の場合、{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} の形式を使用します。オフセットとして値 -1 を設定する場合、"latest" の意味になります。

  • "pollTimeoutMs": (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は 600000 です。

  • "numRetries": (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は 10 です。

  • "maxOffsetsPerTrigger": (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームの topicPartitions 間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。

  • "minPartitions": (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。

  • "includeHeaders": (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、「glue_streaming_kafka_headers」という名前で Array[Struct(key: String, value: String)] 型の列が追加されます。デフォルト値は『false』です。このオプションは、AWS Glue バージョン 3.0 以降でのみ使用可能です。

  • "schema": (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類が avro である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が avro 以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。

    以下に、スキーマの例を示します。

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマが foreachbatch 内のペイロードから検出されます。

  • "avroSchema": (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema パラメータを使用します。

  • "addRecordTimestamp": (オプション) このオプションを「true」に設定すると、トピックが対応するレコードを受信した時刻を表示する「__src_timestamp」という列が、データ出力に追加で表示されます。デフォルト値は、「false」です。このオプションは AWS Glue のバージョン 4.0 以降でサポートされています。

  • "emitConsumerLagMetrics": (オプション) このオプションを「true」に設定すると、バッチごとに、トピックが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との、間隔のメトリクスが出力されます。このメトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

書き込むとき、"connectionType": "kafka" に次の接続オプションを使用します。

  • "connectionName" (必須) Kafka クラスターへの接続に使用される AWS Glue 接続の名前 (Kafka ソースと同様)。

  • "topic" (必須) トピック列が存在する場合、トピック設定オプションが設定されていない限り、指定された行を Kafka に書き込む際にトピック列の値がトピックとして使用されます。つまり、topic 設定オプションはトピック列をオーバーライドします。

  • "partition" (オプション) 有効なパーティション番号が指定された場合、レコードの送信時にその partition が使用されます。

    パーティションが指定されずに key が存在する場合、キーのハッシュを使用してパーティションが選択されます。

    keypartition のどちらも存在しない場合、そのパーティションに少なくとも batch.size バイトが生成された際に変更内容のスティッキーパーティション分割に基づいてパーティションが選択されます。

  • "key" (オプション) partition が null の場合、パーティション分割に使用されます。

  • "classification" (オプション) レコードのデータに使用されるファイル形式。JSON、CSV、Avro のみをサポートしています。

    Avro 形式を使用すると、シリアル化するためにカスタムの AvroSchema を提供できますが、シリアル化解除する場合にもソースに提供する必要があることに注意してください。それ以外の場合、デフォルトで Apache AvroSchema を使用してシリアル化します。

さらに、Kafka プロデューサーの設定パラメーターを更新することにより、必要に応じて Kafka シンクを微調整できます。接続オプションには許可リストがないことに注意してください。すべてのキー値のペアはそのままシンクに保持されます。

ただし、有効にならないオプションの小さな拒否リストがあります。詳細については、「Apache 固有の設定」を参照してください。