

# Kinesis 接続
<a name="aws-glue-programming-etl-connect-kinesis-home"></a>

Kinesis 接続を使用すると、データカタログ テーブルに保存されている情報を使用して、またはデータ ストリームに直接アクセスするための情報を提供することで、Amazon Kinesis データ ストリームの読み取りと書き込みを行うことができます。Kinesis から Spark DataFrame に情報を読み取り、それを AWS Glue DynamicFrame に変換できます。DynamicFrames は JSON 形式で Kinesis に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

`getCatalogSource` または `create_data_frame_from_catalog` を使用して Kinesis ストリーミングソースからレコードを消費する場合、ジョブはデータカタログデータベースとテーブル名の情報を持っており、それを使用して Kinesis ストリーミングソースから読み込むためのいくつかの基本パラメータを取得することができます。`getSource`、`getSourceWithFormat`、`createDataFrameFromOptions`、または `create_data_frame_from_options` を使用している場合、ここで説明する接続オプションを使用して、これらの基本パラメータを指定する必要があります。

Kinesis の接続オプションは、`GlueContext` クラス内の指定されたメソッドに対して以下の引数で指定することができます。
+ Scala
  + `connectionOptions`: `getSource`、`createDataFrameFromOptions`、`getSink` で使用 
  + `additionalOptions`: `getCatalogSource`、`getCatalogSink` で使用
  + `options`: `getSourceWithFormat`、`getSinkWithFormat` で使用
+ Python
  + `connection_options`: `create_data_frame_from_options`、`write_dynamic_frame_from_options` で使用
  + `additional_options`: `create_data_frame_from_catalog`、`write_dynamic_frame_from_catalog` で使用
  + `options`: `getSource`、`getSink` で使用

ストリーミング ETL ジョブに関する注意事項と制限事項については、「[ストリーミング ETL に関する注意と制限](add-job-streaming.md#create-job-streaming-restrictions)」を参照してください。

## Kinesis の設定
<a name="aws-glue-programming-etl-connect-kinesis-configure"></a>

AWS Glue Spark ジョブで Kinesis データストリームに接続するには、いくつかの前提条件が必要です。
+ 読み取る場合、AWS Glue ジョブには、Kinesis データストリームへの読み取りアクセスレベルの IAM 権限が必要です。
+ 書き込みの場合、AWS Glue ジョブには、Kinesis データストリームへの書き込みアクセスレベルの IAM 権限が必要です。

場合によっては、追加の前提条件を設定する必要があります。
+ AWS Glue ジョブが (通常は他のデータセットに接続するための) **追加のネットワーク接続**で設定されていて、その接続のいずれかが Amazon VPC **ネットワークオプション**を提供している場合、ジョブは Amazon VPC 経由で通信するように指示されます。この場合、Amazon VPC を介して通信するように Kinesis データストリームを設定する必要もあります。そのためには、Amazon VPC と Kinesis データストリームの間にインターフェイス VPC エンドポイントを作成します。詳細については、「[インターフェイス VPC エンドポイントと Amazon Kinesis Data Streams の使用](https://docs.aws.amazon.com//streams/latest/dev/vpc.html)」を参照してください。
+ 別のアカウントで Amazon Kinesis Data Streams を指定する場合は、クロスアカウントアクセスを許可するようにロールとポリシーを設定する必要があります。詳細については、「[例: 別のアカウントで Kinesis Stream から読み込む](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html)」を参照してください。

ストリーミング ETL ジョブの前提条件の詳細については、「[AWS Glue でのストリーミング ETL ジョブ](add-job-streaming.md)」を参照してください。

## 例: Kinesis ストリームからの読み込み
<a name="aws-glue-programming-etl-connect-kinesis-read"></a>

### 例: Kinesis ストリームからの読み込み
<a name="section-etl-connect-kinesis-read"></a>

[forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) と組み合わせて使用します。

Amazon Kinesis ストリーミングソースの例:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## 例: Kinesis ストリームへの書き込み
<a name="aws-glue-programming-etl-connect-kinesis-write"></a>

### 例: Kinesis ストリームからの読み込み
<a name="section-etl-connect-kinesis-read"></a>

[forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) と組み合わせて使用します。

Amazon Kinesis ストリーミングソースの例:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Kinesis 接続オプションのリファレンス
<a name="aws-glue-programming-etl-connect-kinesis"></a>

Amazon Kinesis Data Streams への接続を指定します。

Kinesis ストリーミングデータソースには、次の接続オプションを使用します。
+ `"streamARN"` (必須) 読み取り/書き込みに使用されます。Kinesis データストリームの ARN。
+ `"classification"` (読み取りに必須) 読み取りで使用。レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。
+ `"streamName"` - (オプション) 読み取りで使用。読み取り対象/読み取り元の Kinesis データストリームの名前。`endpointUrl` で使用。
+ `"endpointUrl"` - (オプション) 読み取りで使用。デフォルト: "https://kinesis.us-east-1.amazonaws.com"。Kinesis エンドポイントの AWS エンドポイント。特別なリージョンに接続する場合を除き、これを変更する必要はありません。
+ `"partitionKey"` - (オプション) 書き込みに使用。レコードを作成する際に使用される Kinesis パーティションキー。
+ `"delimiter"` (オプション) 読み取りに使用。`classification` が CSV の場合に使用される値の区切り文字。デフォルトは「`,`」です。
+ `"startingPosition"`: (オプション) 読み込みに使用。Kinesis データストリーム内の、データの読み取り開始位置。指定できる値は `"latest"`、`"trim_horizon"`、`"earliest"`、または UTC 形式のタイムスタンプ文字列であり、この文字列のパターンは `yyyy-mm-ddTHH:MM:SSZ` です (`Z` は UTC タイムゾーンのオフセットを \$1/- で表します。例:「2023-04-04T08:00:00-04:00」)。デフォルト値は `"latest"` です。注意: `"startingPosition"` の UTC 形式のタイムスタンプ文字列は AWS Glue バージョン 4.0 以降のみでサポートされます。
+ `"failOnDataLoss"`: (オプション) アクティブなシャードがないか、有効期限が切れている場合、ジョブは失敗します。デフォルト値は `"false"` です。
+ `"awsSTSRoleARN"`: (オプション) 読み取り/書き込みに使用。AWS Security Token Service (AWS STS) を使用して担うロールの Amazon リソースネーム (ARN)。このロールには、Kinesis データストリームのレコードの説明操作または読み取り操作の権限が必要です。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。`"awsSTSSessionName"` と組み合わせて使用します。
+ `"awsSTSSessionName"`: (オプション) 読み取り/書き込みに使用。AWS STS を使って、ロールを担うセッションの識別子。このパラメータは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。`"awsSTSRoleARN"` と組み合わせて使用します。
+ `"awsSTSEndpoint"`: (オプション) 引き受けたロールで Kinesis に接続するときに使用する AWS STS エンドポイント。これにより、デフォルトのグローバルエンドポイントでは不可能な VPC 内のリージョナル AWS STS エンドポイントを使用できます。
+ `"maxFetchTimeInMs"`: (オプション) 読み込みに使用。ジョブエグゼキューターが Kinesis データストリームから現在のバッチのレコードを読み取るために費やした最大時間は、ミリ秒 (ms) 単位で指定されます。この時間内に複数の `GetRecords` API コールを行うことができます。デフォルト値は `1000` です。
+ `"maxFetchRecordsPerShard"`: (オプション) 読み込みに使用。1 マイクロバッチ当たりに Kinesis データストリームでシャードごとにフェッチするレコードの最大数。メモ: ストリーミングジョブが既に Kinesis (同じ get-records 呼び出しで) から余分なレコードを読み取っている場合、クライアントはこの制限を超えることができます。`maxFetchRecordsPerShard` が厳密である必要がある場合、`maxRecordPerRead` の倍数にする必要があります。デフォルト値は `100000` です。
+ `"maxRecordPerRead"`: (オプション) 読み込みに使用。`getRecords` オペレーションごとに、Kinesis データストリームからフェッチするレコードの最大数。デフォルト値は `10000` です。
+ `"addIdleTimeBetweenReads"`: (オプション) 読み込みに使用。2 つの連続する `getRecords` オペレーション間の遅延時間を追加します。デフォルト値は `"False"` です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。
+ `"idleTimeBetweenReadsInMs"`: (オプション) 読み込みに使用。2 つの連続する `getRecords` オペレーション間での、最短の遅延時間 (ミリ秒単位で指定)。デフォルト値は `1000` です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。
+ `"describeShardInterval"`: (オプション) 読み込みに使用。スクリプトが呼び出す 2 つの `ListShards` API 間での、再シャーディングを考慮すべき最小時間。詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[リシャーディングのための戦略](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html)」を参照してください。デフォルト値は `1s` です。
+ `"numRetries"`: (オプション) 読み込みに使用。Kinesis Data Streams API リクエストを再試行する最大の回数。デフォルト値は `3` です。
+ `"retryIntervalMs"`: (オプション) 読み込みに使用。Kinesis Data Streams API 呼び出しを再試行するまでのクールオフ期間 (ミリ秒単位で指定)。デフォルト値は `1000` です。
+ `"maxRetryIntervalMs"`: (オプション) 読み込みに使用。再試行で 2 つの Kinesis Data Streams API を呼び出す間の最大クールオフ期間 (ミリ秒単位で指定)。デフォルト値は `10000` です。
+ `"avoidEmptyBatches"`: (オプション) 読み込みに使用。バッチ処理を開始する前に、Kinesis データストリームで未読のデータをチェックすることで、空のマイクロバッチジョブを作成しないようにします。デフォルト値は `"False"` です。
+ `"schema"`: (inferSchema に false を設定した場合は必須) 読み取りに使用。ペイロードの処理に使用するスキーマ。分類が `avro` である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が `avro` 以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。

  以下に、スキーマの例を示します。

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`: (オプション) 読み込みに使用。デフォルト値は、「false」です。「true」に設定すると、実行時に、スキーマが `foreachbatch` 内のペイロードから検出されます。
+ `"avroSchema"`: (非推奨) 読み取りに使用。Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。`schema` パラメータを使用します。
+ `"addRecordTimestamp"`: (オプション) 読み込みに使用。このオプションが「true」に設定されている場合、データ出力には、対応するレコードがストリームによって受信された時刻を表示する「\$1\$1src\$1timestamp」という名前が付けられた追加の列が含まれます。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。
+ `"emitConsumerLagMetrics"`: (オプション) 読み込みに使用。このオプションを「true」に設定すると、バッチごとに、ストリームが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との間隔のメトリクスが出力されます。メトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。
+ `"fanoutConsumerARN"`: (オプション) 読み込みに使用。`streamARN` で指定されたストリームの Kinesis ストリームコンシューマの ARN。Kinesis 接続の拡張ファンアウトモードを有効にするために使用されます。拡張ファンアウトが使用された Kinesis ストリームの使用に関する詳細については、「[Kinesis ストリーミングジョブでの拡張ファンアウトの使用](aws-glue-programming-etl-connect-kinesis-efo.md)」を参照してください。
+ `"recordMaxBufferedTime"` - (オプション) 書き込みに使用。デフォルト: 1000 (ミリ秒)。レコードが書き込まれるのを待っている間にバッファリングされる最大時間。
+ `"aggregationEnabled"` - (オプション) 書き込みに使用。デフォルト: true。Kinesis に送信する前にレコードを集約するかどうかを指定します。
+ `"aggregationMaxSize"` - (オプション) 書き込みに使用。デフォルト: 51200 (バイト) レコードがこの制限よりも大きい場合、そのレコードはアグリゲータをバイパスします。注: Kinesis では、レコードサイズに 50 KB の制限が適用されます。これを 50 KB を超えて設定すると、サイズ超過のレコードは Kinesis によって拒否されます。
+ `"aggregationMaxCount"` - (オプション) 書き込みに使用。デフォルト: 4294967295。集計されたレコードにパックされる項目の最大数。
+ `"producerRateLimit"` - (オプション) 書き込みに使用。デフォルト: 150 (%)。1 つのプロデューサー (ジョブなど) からの送信されるシャード単位のスループットをバックエンド制限のパーセンテージとして制限できます。
+ `"collectionMaxCount"` - (オプション) 書き込みに使用。デフォルト: 500。PutRecords レコードにパックされる項目の最大数。
+ `"collectionMaxSize"` - (オプション) 書き込みに使用。デフォルト: 5242880 (バイト)。PutRecords リクエストで送信するデータの最大量。

# Kinesis ストリーミングジョブでの拡張ファンアウトの使用
<a name="aws-glue-programming-etl-connect-kinesis-efo"></a>

拡張ファンアウトコンシューマは、通常のコンシューマよりも高い専用スループットで Kinesis ストリームからレコードを受信できます。これは、ジョブなどの Kinesis コンシューマにデータを提供するために使用される転送プロトコルを最適化することによって行われます。Kinesis 拡張ファンアウトの詳細については、[Kinesis のドキュメント](https://docs.aws.amazon.com//streams/latest/dev/enhanced-consumers.html)を参照してください。

拡張ファンアウトモードでは、`maxRecordPerRead` および `idleTimeBetweenReadsInMs` 接続オプションは適用されなくなりました。拡張ファンアウトを使用する場合、これらのパラメータは設定できないためです。リトライの設定オプションは説明どおりに機能します。

以下の手順に従って、ストリーミングジョブの拡張ファンアウトを有効または無効にします。ストリームのデータを消費するジョブごとに、ストリームコンシューマを登録する必要があります。

**ジョブの拡張ファンアウト消費を有効にするには:**

1. Kinesis API を使用してジョブのストリームコンシューマを登録します。手順に従って、[Kinesis ドキュメント](https://docs.aws.amazon.com//streams/latest/dev/building-enhanced-consumers-api)の *Kinesis Data Streams API を使用して、拡張ファンアウトでコンシューマを登録*します。必要なのは、最初のステップである [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) を呼び出すことだけです。リクエストは ARN、*consumerARN* を返す必要があります。

1. 接続メソッドの引数で接続オプション `fanoutConsumerARN` を *consumerARN* に設定します。

1. ジョブを再開してください。

**ジョブの拡張ファンアウト消費を無効にするには:**

1. メソッド呼び出しから接続オプション `fanoutConsumerARN` を削除します。

1. ジョブを再開してください。

1. [Kinesis ドキュメント](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-console.html)の指示に従ってコンシューマを登録解除します。これらの手順はコンソールに適用されますが、Kinesis API を使用して実行することもできます。Kinesis API によるストリームコンシューマの登録解除の詳細については、Kinesis ドキュメントの「[DeregisterStreamConsumer](https://docs.aws.amazon.com//kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)」を参照してください。