

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR クラスターは、Hive、Pig、MapReduce、Hadoop Streaming API、Cascading などの Hadoop エコシステムで使い慣れたツールを使用して、Amazon Kinesis Streams を直接読み込み、処理することができます。また、実行しているクラスターで Amazon Kinesis のリアルタイムデータを Amazon S3、Amazon DynamoDB、および HDFS の既存データと結合できます。後処理のアクティビティとして、Amazon EMR から Amazon S3 または DynamoDB に直接データをロードできます。Amazon Kinesis の主なサービスと料金については、「[Amazon Kinesis](https://aws.amazon.com//kinesis)」ページを参照してください。

## Amazon EMR と Amazon Kinesis の統合で何ができますか?
<a name="kinesis-use-cases"></a>

 Amazon EMR と Amazon Kinesis の統合により、以下のようなシナリオへの対応が非常に容易になります。
+ **ストリーミングログ分析** – ストリーミングのウェブログを分析して、リージョン別、ブラウザ別、およびアクセスドメイン別に、数分ごとの上位 10 件のエラータイプのリストを生成できます。
+ **カスタマーエンゲージメント** – Amazon Kinesis のクリックストリームデータと DynamoDB テーブルに保存されている広告キャンペーン情報を結合するクエリを作成し、特定のウェブサイトに表示される最も効果的な広告カテゴリを特定できます。
+ **アドホックインタラクティブクエリ** – Amazon Kinesis Streams から HDFS に定期的にデータを読み込み、ローカルの Impala テーブルとして使用可能にすることで、高速かつインタラクティブな分析クエリを実行できます。

## Amazon Kinesis Streams のチェックポイントの分析
<a name="kinesis-checkpoint"></a>

ユーザーは、Amazon Kinesis Streams の定期的なバッチ分析をいわゆる*反復*で実行できます。Amazon Kinesis Streams のデータレコードはシーケンス番号を使用して取得されるため、反復の境界は Amazon EMR によって DynamoDB テーブルに格納される開始と終了のシーケンス番号で定義されます。たとえば、`iteration0` が終了すると、終了のシーケンス番号が DynamoDB に格納されるため、`iteration1` ジョブが開始されたとき、ストリームからそれに続くデータを取得できます。このストリームデータの反復のマッピングは*チェックポイント*と呼ばれます。詳細については、「[Kinesis コネクター](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)」を参照してください。

反復にチェックポイントが設定された後に、ジョブの反復処理が失敗した場合、Amazon EMR では、その反復のレコード処理が再試行されます。

チェックポイントは、次のことが可能になる機能です。
+ 同じストリームと論理名で実行した前のクエリにより処理された連続番号の後でデータ処理を開始します
+ 前のクエリで処理された Kinesis のデータと同じバッチを再処理します

 チェックポイントを有効にするには、スクリプトで `kinesis.checkpoint.enabled` パラメータを `true` に設定します。また、以下のパラメータを設定します。


| 構成設定 | 説明 | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | チェックポイント情報が保存される DynamoDB テーブル名 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB テーブルのハッシュキー名 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB テーブルの範囲キー名 | 
| kinesis.checkpoint.logical.name | 現在の処理の論理名 | 
| kinesis.checkpoint.iteration.no | 論理名に関連付けられている処理の反復番号 | 
| kinesis.rerun.iteration.without.wait | 失敗した反復がタイムアウトを待たずに再実行できるかどうかを示すブール値（デフォルトは false）。 | 

### Amazon DynamoDB テーブルのプロビジョニングされた IOPS に関する推奨事項
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis 用の Amazon EMR コネクターは、チェックポイントのメタデータの補助として DynamoDB データベースを使用します。Amazon Kinesis Streams のデータをチェックポイントの間隔で Amazon EMR クラスターで使用する前に、DynamoDB のテーブルを作成する必要があります。テーブルは Amazon EMR クラスターと同じリージョンに存在する必要があります。以下は、DynamoDB テーブルにプロビジョニングする必要がある IOPS の数に関する一般的な推奨です。`j` は同時に実行できる Hadoop ジョブ (異なる論理名 \$1 反復数の組み合わせ) の最大数で、`s` はジョブが処理するシャードの最大数です。

**読み込みキャパシティーユニット**: `j`\$1`s`/`5`

**書き込みキャパシティーユニット**: `j`\$1`s`

## パフォーマンスに関する考慮事項
<a name="performance"></a>

Amazon Kinesis シャードスループットは、Amazon EMR クラスターのノードのインスタンスのサイズ、およびストリームのレコードのサイズに正比例しています。マスターノードやコアノードで m5.xlarge かそれ以上のインスタンスを使用することをお勧めします。

## Amazon EMR で Amazon Kinesis 分析をスケジュールする
<a name="schedule"></a>

任意の繰り返しについてタイムアウトと最大期間で制限される、アクティブな Amazon Kinesis Streams でデータを分析するときは、ストリームから定期的に詳細を取得するために、分析を頻繁に実行することが重要です。定期的な間隔でこのようなスクリプトおよびクエリを実行する方法は複数ありますが、これらのような反復タスクには、 AWS Data Pipeline を使用することをお勧めします。詳細については、「*AWS Data Pipeline 開発者ガイド*」の「[AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)」および「[AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)」を参照してください。

# Amazon EMR 7.0 用スパークキネシスコネクターの SDK 2.x への移行
<a name="migrating-spark-kinesis"></a>

 AWS SDK は、認証情報の管理、S3 および Kinesis サービスへの接続など、クラウドコンピューティングサービスとやり取り AWS するための豊富な APIs とライブラリのセットを提供します。Spark Kinesis コネクターは Kinesis データストリームからデータを消費するために使用され、受信したデータは Spark の実行エンジンで変換および処理されます。現在、このコネクタは 1.x の AWS SDK と Kinesis-client-library (KCL) 上に構築されています。

 AWS SDK 2.x 移行の一環として、Spark Kinesis コネクタも SDK 2.x で実行されるように更新されます。Amazon EMR 7.0 リリースの Spark には、コミュニティバージョンの Apache Spark ではまだ利用できない SDK 2.x アップグレードが含まれています。7.0 より前のリリースの Spark Kinesis コネクタを使用する場合、Amazon EMR 7.0 に移行する前に、アプリケーションコードを SDK 2.x で実行するように移行する必要があります。

## 移行ガイド
<a name="migrating-spark-kinesis-migration-guides"></a>

このセクションでは、アップグレードされた Spark Kinesis コネクタにアプリケーションを移行する手順について説明します。これには、 AWS SDK 2.x の Kinesis Client Library (KCL) 2.x、 AWS 認証情報プロバイダー、および AWS サービスクライアントに移行するためのガイドが含まれています。参考までに、Kinesis コネクタを使用するサンプル [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) プログラムも含まれています。

**Topics**
+ [1.x から 2.x への移行](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [AWS SDK 1.x から 2.x への AWS 認証情報プロバイダーの移行](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [AWS SDK 1.x から 2.x への AWS サービスクライアントの移行](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [ストリーミングアプリケーションのコード例](#migrating-spark-kinesis-streaming-examples)
+ [アップグレードされた Spark Kinesis コネクターを使用する際の考慮事項](#migrating-spark-kinesis-considerations)

### 1.x から 2.x への移行
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream` のメトリクスレベルとディメンション**

  `KinesisInputDStream` をインスタンス化すると、ストリームのメトリクスレベルとディメンションを制御できます。以下の例は、KCL 1.x でこれらのパラメータをカスタマイズする方法を示しています。

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  KCL 2.x では、これらの構成設定のパッケージ名は異なります。2.x に移行するには:

  1. `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` および `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` のインポートステートメントをそれぞれ `software.amazon.kinesis.metrics.MetricsLevel` および `software.amazon.kinesis.metrics.MetricsUtil` に変更します。

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. 行 `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` を `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)` に置き換えます。

  以下は、カスタマイズされたメトリクスレベルとメトリクスディメンションを備えた `KinesisInputDStream` の更新バージョンです。

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream` のメッセージハンドラー関数

  `KinesisInputDStream` をインスタンス化するときに、パーティションキーなどの Record に含まれる他のデータを使用したい場合に備えて、Kinesis Record を取得して汎用オブジェクト T を返す「メッセージハンドラー関数」を提供することもできます。

  KCL 1.x では、メッセージハンドラー関数のシグネチャは `Record => T` であり、レコードは `com.amazonaws.services.kinesis.model.Record` です。KCL 2.x では、ハンドラーの署名は `KinesisClientRecord => T` に変更され、KinesisClientRecord は `software.amazon.kinesis.retrieval.KinesisClientRecord` です。

  KCL 1.x でメッセージハンドラーを提供する例を以下に示します。

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  メッセージハンドラーを移行するには:

  1. `com.amazonaws.services.kinesis.model.Record` のインポートステートメントを `software.amazon.kinesis.retrieval.KinesisClientRecord` に変更します。

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. メッセージハンドラーのメソッドシグネチャを更新します。

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  KCL 2.x でメッセージハンドラーを提供する、更新済みの例を以下に示します。

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  KCL 1.x から 2.x への移行について詳しくは、「[KCL 1.x から KCL 2.x へのコンシューマの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)」を参照してください。

### AWS SDK 1.x から 2.x への AWS 認証情報プロバイダーの移行
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

認証情報プロバイダーは、 とのやり取りの AWS 認証情報を取得するために使用されます AWS。SDK 2.x の認証情報プロバイダーに関連するインターフェイスとクラスの変更がいくつかあります。詳しくは、[こちら](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)をご覧ください。Spark Kinesis コネクタは、1.x バージョンの AWS 認証情報プロバイダーを返すインターフェイス (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) と実装クラスを定義しています。これらの認証情報プロバイダーは、Kinesis クライアントを初期化する際に必要です。たとえば、`SparkAWSCredentials.provider`アプリケーションで メソッドを使用している場合は、2.x バージョンの AWS 認証情報プロバイダーを使用するようにコードを更新する必要があります。

 AWS SDK 1.x で認証情報プロバイダーを使用する例を次に示します。

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**SDK 2.x に移行するには:**

1. `com.amazonaws.auth.AWSCredentialsProvider` のインポートステートメントを `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider` に変更します。

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. このクラスを使用する残りのコードを更新します。

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### AWS SDK 1.x から 2.x への AWS サービスクライアントの移行
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS サービスクライアントは 2.x で異なるパッケージ名 ( など`software.amazon.awssdk`) を持ちますが、SDK 1.x は を使用します`com.amazonaws`。クライアント変更に関する詳細については、[こちら](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)を参照してください。コードでこれらのサービスクライアントを使用している場合は、それに応じてクライアントを移行する必要があります。

SDK 1.x でクライアントを作成する例を以下に示します。

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**2.x に移行するには:**

1. サービスクライアントのインポートステートメントを変更します。DynamoDB クライアントを例にとってみましょう。`com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` または `com.amazonaws.services.dynamodbv2.document.DynamoDB` を `software.amazon.awssdk.services.dynamodb.DynamoDbClient` に変更する必要があります。

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. クライアントを初期化するコードを更新してください

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

    AWS SDK を 1.x から 2.x に移行する方法の詳細については、[AWS 「 SDK for Java 1.x と 2.x の違い」を参照してください。](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### ストリーミングアプリケーションのコード例
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### アップグレードされた Spark Kinesis コネクターを使用する際の考慮事項
<a name="migrating-spark-kinesis-considerations"></a>
+ アプリケーションが 11 より前のバージョンの JDK の `Kinesis-producer-library` を使用している場合、`java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter` のような例外が発生する可能性があります。これは、EMR 7.0 にはデフォルトで JDK 17 が付属しており、J2EE モジュールは Java 11 以降で標準ライブラリから削除されているためです。これは pom ファイルに次の依存関係を追加することで修正できます。必要に応じてライブラリバージョンを適切なもので交換します。

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ EMR クラスターの作成後、Spark Kinesis コネクタ jar はパス `/usr/lib/spark/connector/lib/` にあります。