

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# コネクターおよびユーティリティ
<a name="emr-connectors"></a>

Amazon EMR には、データソースとして他の AWS サービスにアクセスするためのコネクタとユーティリティが多数用意されています。これらのサービスのデータには、通常プログラム内でアクセスします。たとえば、Hive クエリ、Pig スクリプト、または MapReduce アプリケーションの Kinesis ストリームを指定し、そのデータを操作できます。

**Topics**
+ [Amazon EMR による DynamoDB 内テーブルのエクスポート、インポート、クエリ、結合](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [S3DistCp (s3-dist-cp)](UsingEMR_s3distcp.md)
+ [S3DistCp ジョブが失敗した後のクリーンアップ](#s3distcp-cleanup)

# Amazon EMR による DynamoDB 内テーブルのエクスポート、インポート、クエリ、結合
<a name="EMRforDynamoDB"></a>

**注記**  
Amazon EMR-DynamoDB コネクターは GitHub のオープンソースです。詳細については、[https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector) を参照してください。

DynamoDB は、高速で予測可能なパフォーマンスとシームレスなスケーラビリティを特長とするフルマネージド NoSQL データベースサービスです。開発者がデータベーステーブルを作成し、そのリクエストトラフィックやストレージを無制限に拡張できます。DynamoDB は、高速処理を維持しながら、テーブルのデータとトラフィックを十分な数のサーバーに自動的に分散し、顧客が指定したリクエスト容量と、格納されているデータ量を処理します。Amazon EMR および Hive を使用すると、DynamoDB に格納されているデータなど、大量のデータを迅速かつ効率的に処理できます。DynamoDB の詳細については、「[Amazon DynamoDB 開発者ガイド](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)」を参照してください。

Apache Hive は、HiveQL と呼ばれる簡素化された SQL のようなクエリ言語を使用して、マップリデュースクラスターにクエリを実行する際に使用できるソフトウェア層で、Hadoop アーキテクチャーの上で実行されます。Hive および HiveQL の詳細については、「[HiveQL 言語マニュアル](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)」を参照してください。Hive および Amazon EMR の詳細については、「[Apache Hive](emr-hive.md)」を参照してください。

DynamoDB への接続を含む、カスタマイズされたバージョンの Hive とともに Amazon EMR を使用して、DynamoDB に保存されたデータに対してオペレーションを実行できます。
+ DynamoDB データを Hadoop Distributed File System (HDFS) にロードし、そのデータを Amazon EMR クラスターへの入力として使用する。
+ SQL のようなステートメント（HiveQL）を使用してライブ DynamoDB データにクエリを実行する
+ DynamoDB に格納されたデータを結合し、そのデータをエクスポートするか、結合されたデータに対してクエリを実行する。
+ DynamoDB に格納されたデータを Amazon S3 にエクスポートする
+ Amazon S3 に格納されたデータを DynamoDB にインポートする

**注記**  
Amazon EMR-DynamoDB コネクターは、[Kerberos 認証](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html)を使用するように設定されたクラスターをサポートしていません。

次のタスクのいずれかを実行するには、Amazon EMR クラスターを起動し、DynamoDB のデータの場所を指定して、DynamoDB のデータを操作するための Hive コマンドを発行します。

Amazon EMR クラスターを起動する方法はいくつかあります。Amazon EMR コンソール、コマンドラインインターフェイス (CLI)、または AWS SDK または Amazon EMR API を使用してクラスターをプログラムできます。また、Hive クラスターをインタラクティブに実行するか、スクリプトから実行するかを選択することもできます。このセクションでは、インタラクティブな Hive クラスターを Amazon EMR コンソールと CLI から起動する方法について説明します。

Hive をインタラクティブに使用する方法は、クエリパフォーマンスをテストし、アプリケーションを調整するには最適です。定期的に実行する一連の Hive コマンドを確立したら、Amazon EMR が実行できる Hive スクリプトを作成することを検討します。

**警告**  
DynamoDB テーブルに対する Amazon EMR 読み取りまたは書き込みオペレーションは、確立されたプロビジョニング済みスループットに対して不利に働き、プロビジョニングされたスループットの例外が発生する頻度が高くなる可能性があります。リクエストが多い場合、Amazon EMR はエクスポネンシャルバックオフによってリトライを実装し、DynamoDB テーブルに対するリクエストロードを管理します。Amazon EMR ジョブを他のトラフィックと同時に実行すると、割り当てられたプロビジョニング済みスループットレベルを超えることがあります。これをモニタリングするには、Amazon CloudWatch で [**ThrottleRequests**] メトリクスを確認します。リクエストロードが高すぎる場合は、クラスターを再作成して、[読み込みパーセントの設定](EMR_Hive_Optimizing.md#ReadPercent) または [書き込みパーセントの設定](EMR_Hive_Optimizing.md#WritePercent) をより低い値に設定し、Amazon EMR オペレーションを調整できます。DynamoDB スループットの設定については、「[プロビジョニングされたスループット](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput)」を参照してください。  
テーブルが[オンデマンドモード](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand)に設定されている場合は、エクスポートまたはインポート操作を実行する前に、テーブルをプロビジョニングモードに戻す必要があります。DynamoDBTable から使用するリソースを計算するには、パイプラインにスループット比率が必要です。オンデマンドモードでは、プロビジョニングされたスループットが削除されます。スループット容量をプロビジョニングするには、Amazon CloudWatch Events メトリクスを使用して、テーブルが使用した集約スループットを評価できます。

**Topics**
+ [Hive コマンドを実行するように Hive テーブルをセットアップする](EMR_Interactive_Hive.md)
+ [DynamoDB 内データのエクスポート、インポート、クエリを行う Hive コマンドの使用例](EMR_Hive_Commands.md)
+ [DynamoDB での Amazon EMR オペレーションのパフォーマンスの最適化](EMR_Hive_Optimizing.md)

# Hive コマンドを実行するように Hive テーブルをセットアップする
<a name="EMR_Interactive_Hive"></a>

Apache Hive は、SQL のような言語を使用して、Amazon EMR クラスターに含まれるデータに対してクエリを実行する際に使用できるデータウェアハウスアプリケーションです。Hive の詳細については、[http://hive.apache.org/](http://hive.apache.org/) を参照してください。

次の手順は、既にクラスターを作成し、Amazon EC2 キーペアを指定したことを前提にしています。クラスターの作成を開始する方法については、「*Amazon EMR 管理ガイド*」の「[Amazon EMR の開始方法](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs)」を参照してください。

## MapReduce を使用するように Hive を設定する
<a name="hive-mapreduce"></a>

Amazon EMR で Hive を使用して DynamoDB テーブルに対してクエリを実行する場合、デフォルトの実行エンジンである Tez を Hive が使用していると、エラーが発生する可能性があります。このため、このセクションで説明しているように、DynamoDB と統合される Hive でクラスターを作成するときには、MapReduce を使用するように Hive を設定する設定分類を使用することをお勧めします。詳細については、「[アプリケーションの設定](emr-configure-apps.md)」を参照してください。

次のスニペットは、Hive の実行エンジンとして MapReduce を設定するために使用する分類設定とプロパティを示しています。

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**Hive コマンドをインタラクティブに実行するには**

1. マスターノードに接続します。詳細については、「*Amazon EMR 管理ガイド*」の「[SSH を使用してマスターノードに接続する](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)」を参照してください。

1. 現在のマスターノードの コマンドプロンプトで、`hive` と入力します。

   `hive>` という Hive プロンプトが表示されます。

1.  Hive アプリケーションのテーブルを DynamoDB のデータにマップする Hive コマンドを入力します。このテーブルは、Amazon DynamoDB に格納されているデータの参照として機能します。データは Hive のローカルに保存されておらず、このテーブルを使用するクエリは、DynamoDB 内のライブデータに対して実行されます。また、コマンドを実行するたびに、テーブルの読み取り容量と書き込み容量が消費されます。同じデータセットに対して複数の Hive コマンドを実行する予定がある場合は、まずエクスポートすることをお勧めします。

    次に、Hive テーブルを DynamoDB テーブルにマッピングする構文を示します。

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    DynamoDB から Hive にテーブルを作成する場合、`EXTERNAL` というキーワードを使用して外部テーブルとして作成する必要があります。外部テーブルと内部テーブルの違いは、内部テーブルの drop が実行されると内部テーブルのデータが削除される点です。Amazon DynamoDB に接続する場合、この動作は望ましくないため、外部テーブルのみがサポートされます。

    たとえば、次の Hive コマンドでは、*dynamodbtable1* という DynamoDB テーブルを参照する *hivetable1* というテーブルが Hive に作成されます。DynamoDB テーブル *dynamodbtable1* には、ハッシュおよび範囲プライマリキースキーマがあります。ハッシュキー要素は `name`（文字列型）、範囲キー要素は `year`（数値型）です。各項目には、`holidays`（文字列セット型）の属性値があります。

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    行 1 では、HiveQL `CREATE EXTERNAL TABLE` ステートメントを使用しています。*hivetable1* の場合、DynamoDB テーブルの属性名と値の各ペアについて、列を設定し、データ型を指定する必要があります。これらの値は大文字と小文字が区別されません。列には、（予約語を除き）任意の名前を付けることができます。

    行 2 では、`STORED BY` ステートメントを使用しています。`STORED BY` の値は、Hive と DynamoDB 間の接続を処理するクラス名です。`'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'` のように設定する必要があります。

    行 3 では、`TBLPROPERTIES` ステートメントを使用して、"hivetable1" を DynamoDB 内の正しいテーブルとスキーマに関連付けています。`TBLPROPERTIES` に、`dynamodb.table.name` パラメータと `dynamodb.column.mapping` パラメータの値を指定します。これらの値は、大文字と小文字が*区別されます*。
**注記**  
 テーブルのすべての DynamoDB 属性名には、Hive テーブルに対応する列が存在する必要があります。Amazon EMR のバージョンに応じて、1 対 1 のマッピングが存在しない場合は、次のシナリオが発生します。  
Amazon EMR のバージョン 5.27.0 以降では、コネクターは、DynamoDB の属性名と Hive テーブル内の列間で 1 対 1 のマッピングを確実にするための検証を行います。1 対 1 のマッピングが存在しない場合は、エラーが発生します。
Amazon EMR バージョン 5.26.0 以前では、Hive テーブルには DynamoDB からの名前と値のペアは含まれません。DynamoDB プライマリキー属性をマップしない場合、Hive からエラーが生成されます。プライマリキー以外の属性をマップしない場合、エラーは生成されませんが、Hive テーブルのデータは表示されません。データ型が一致しない場合、値は null です。

次に、*hivetable1* で Hive 操作の実行を開始できます。*hivetable1* に対して実行されるクエリは、DynamoDB アカウントの DynamoDB テーブル *dynamodbtable1* に対して内部的に実行され、実行するたびに読み取り単位または書き込み単位が消費されます。

DynamoDB テーブルに対して Hive クエリを実行する際には、十分な量の読み込み容量単位をプロビジョニングしておく必要があります。

たとえば、DynamoDB テーブルに対して 100 ユニットの読み込みキャパシティーをプロビジョニングしているとします。この場合、1 秒間に 100 の読み込み（409,600 バイト）を実行できます。そのテーブルに 20 GB（21,474,836,480 バイト）のデータが含まれており、Hive クエリがフルテーブルスキャンを実行する場合、クエリの実行にかかる時間は次のように見積もられます。

 * 21,474,836,480 / 409,600 = 52,429 秒 = 14.56 時間* 

必要な時間を短縮するには、ソース DynamoDB テーブルで読み込みキャパシティーユニットを調整する以外に方法はありません。Amazon EMR ノードを追加しても、役に立ちません。

Hive 出力では、1 つ以上のマッパープロセスが終了すると、完了のパーセンテージが更新されます。プロビジョニングされた読み込みキャパシティーが小さく設定された大きな DynamoDB テーブルでは、完了のパーセンテージ出力が長時間更新されない場合があります。上記のような場合、ジョブは数時間にわたって 0% 完了として表示されます。ジョブの進行状況の詳細なステータスについては、Amazon EMR コンソールに移動してください。ここで、個別のマッパータスクのステータスおよびデータ読み込みの統計を表示できます。また、マスターノードの Hadoop インターフェイスにログオンし、Hadoop 統計を表示することもできます。ここには、個別のマップタスクのステータスおよびいくつかのデータ読み込み統計が表示されます。詳細については、以下の各トピックを参照してください。
+ [マスターノード上にホストされるウェブインターフェイス](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [Hadoop のウェブインターフェイスを表示する](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

DynamoDB からのデータのエクスポートやインポート、テーブルの結合などのタスクを実行するその他の HiveQL ステートメントの例については、「[DynamoDB 内データのエクスポート、インポート、クエリを行う Hive コマンドの使用例](EMR_Hive_Commands.md)」を参照してください。<a name="EMR_Hive_Cancel"></a>

**Hive リクエストをキャンセルするには**

Hive クエリを実行すると、サーバーから返される最初の応答には、リクエストをキャンセルするコマンドが含まれます。プロセスの任意の時点でリクエストをキャンセルするには、サーバーの応答に含まれる **Kill Command** を使用します。

1. `Ctrl+C` を入力して、コマンドラインクライアントを終了します。

1.  シェルプロンプトで、リクエストに対するサーバーからの最初の応答に含まれていた **Kill Command** を入力します。

    または、マスターノードのコマンドラインから次のコマンドを実行して、Hadoop ジョブをキルします。この *job-id* は、Hadoop ジョブの識別子であり、Hadoop ユーザーインターフェイスから取得できます。

   ```
   hadoop job -kill job-id
   ```

## Hive と DynamoDB のデータ型
<a name="EMR_Hive_Properties"></a>

次の表は、使用可能な Hive データ型、対応するデフォルトの DynamoDB 型、およびそれらもマッピングできる代替 DynamoDB 型を示しています。


| Hive の型 | デフォルトの DynamoDB 型 | 代替 DynamoDB 型 | 
| --- | --- | --- | 
| string | 文字列（S） |  | 
| bigint または double | 数値（N） |  | 
| バイナリ | バイナリ（B） |  | 
| boolean | boolean (BOOL) |  | 
| array | list (L) | 数値セット（NS）、文字列セット（SS）、またはバイナリセット（BS） | 
| map<string,string> | item | map (M) | 
| map<string,?> | map (M) |  | 
|  | null (NULL) |  | 

Hive データを対応する代替 DynamoDB 型として書き込む場合、または DynamoDB データに代替 DynamoDB 型の属性値が含まれる場合、`dynamodb.type.mapping` パラメータを使用して列と DynamoDB 型を指定できます。次の例は、代替型マッピングを指定する構文を示しています。

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

型マッピングパラメータはオプションで、代替型を使用する列に対してのみ指定する必要があります。

たとえば、次の Hive コマンドでは、DynamoDB テーブル `dynamodbtable2` を参照する `hivetable2` というテーブルが作成されます。これは、`col3` 列を文字列セット (SS) 型にマッピングする点を除いて、`hivetable1` に似ています。

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

Hive では、`hivetable1` と `hivetable2` は同一です。ただし、これらのテーブルからのデータが対応する DynamoDB テーブルに書き込まれる際、`dynamodbtable2` には文字列セットが含まれますが `dynamodbtable1` にはリストが含まれます。

Hive `null` 値を DynamoDB `null` 型の属性として書き込む場合は、`dynamodb.null.serialization` パラメータを使用して実行できます。次の例は、`null` シリアル化を指定する構文を示しています。

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

null シリアル化パラメータはオプションであり、指定しない場合は `false` に設定されます。パラメータ設定に関係なく、DynamoDB `null` 属性は Hive で `null` 値として読み取られることに注意してください。`null` 値を持つ Hive コレクションは、null シリアル化パラメータが `true` として指定されている場合にのみ DynamoDB に書き込むことができます。指定されない場合、Hive エラーが発生します。

精度の点で、Hive の bigint 型は Java の long 型と同じであり、Hive の double 型は Java の double 型と同じです。つまり、エクスポート、インポート、または参照する Hive を使用して、Hive のデータ型で使用できる精度よりも高い精度の数値データが DynamoDB に格納される場合、DynamoDB データの精度が低下したり、Hive クエリが失敗したりする可能性があります。

 DynamoDB から Amazon Simple Storage Service (Amazon S3) または HDFS にバイナリ型をエクスポートすると、Base64 のエンコード文字列として格納されます。Amazon S3 または HDFS のデータを DynamoDB のバイナリ型にインポートする場合、Base64 文字列としてエンコードされます。

## Hive のオプション
<a name="EMR_Hive_Options"></a>

 次の Hive オプションを設定して、Amazon DynamoDB のデータの転送を管理できます。これらのオプションは、現在の Hive セッションの間のみ、有効です。Hive コマンドプロンプトを終了した後で、クラスターでこのプロンプトを再び開いた場合、この設定はデフォルト値に戻ります。


| Hive のオプション | 説明 | 
| --- | --- | 
| dynamodb.throughput.read.percent |   DynamoDB のプロビジョニングされたスループットレートが、表の割り当てられた範囲内に収まるように、読み取りオペレーションのレートを設定します。値は `0.1`～`1.5` 以内にします。  0.5 の値がデフォルトの読み取りレートです。つまり、Hive は、表に記載されている読み取りのプロビジョニングされた全体のリソースの半分を消費しようと試みます。値を 0.5 から増やすと、読み取りリクエストレートも増えます。0.5 未満に減らすと、読み取りリクエストレートも減ります。この読み取りレートは概算です。実際の読み取りレートは、DynamoDB に統一ディストリビューションのキーがあるかどうかなどの要因によって変わります。  Hive 操作によって、プロビジョニングされたスループットを頻繁に超える場合、またはライブの読み取りトラフィックがスロットリングされる回数が多い場合、この値を `0.5` 未満に減らします。十分な容量があり、Hive 操作を高速にする場合は、この値を `0.5` より増やします。また、使用できる未使用の入力/出力操作があると考えられる場合、最大 1.5 に設定してオーバーサブスクライブを実行することもできます。  | 
| dynamodb.throughput.write.percent |   DynamoDB のプロビジョニングされたスループットレートが、表の割り当てられた範囲内に収まるように、書き込みオペレーションのレートを設定します。値は `0.1`～`1.5` 以内にします。  0.5 の値がデフォルトの書き込みレートです。つまり、Hive は、表に記載されている書き込みのプロビジョニングされた全体のリソースの半分を消費しようと試みます。値を 0.5 から増やすと、書き込みリクエストレートも増えます。0.5 未満に減らすと、書き込みリクエストレートも減ります。この書き込みレートは概算です。実際の書き込みレートは、DynamoDB に統一ディストリビューションのキーがあるかどうかなどの要因によって変わります。  Hive 操作によって、プロビジョニングされたスループットを頻繁に超える場合、またはライブの書き込みトラフィックがスロットリングされる回数が多い場合、この値を `0.5` 未満に減らします。十分な容量があり、Hive 操作を高速にする場合は、この値を `0.5` より増やします。また、使用できる未使用の入力/出力操作があると考えられる場合、または、テーブルに対する最初のデータアップロードであり、ライブトラフィックがまだない場合は、最大 1.5 に設定してオーバーサブスクライブを実行することもできます。  | 
| dynamodb.endpoint | DynamoDB サービスのエンドポイントを指定します。使用できる DynamoDB エンドポイントの詳細については、「[リージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region)」を参照してください。  | 
| dynamodb.max.map.tasks |   DynamoDB からデータを読み取るときは、マップタスクの最大数を指定します。この値は、1 以上にする必要があります。  | 
| dynamodb.retry.duration |   Hive コマンドの再試行のタイムアウト期間として使用する分数を指定します。この値は、0 以上の整数にする必要があります。デフォルトのタイムアウト期間は 2 分です。  | 

 これらのオプションは、次の例に示すように、`SET` コマンドを使用して設定されます。

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# DynamoDB 内データのエクスポート、インポート、クエリを行う Hive コマンドの使用例
<a name="EMR_Hive_Commands"></a>

以下の例では、Hive コマンドを使用して、データを Amazon S3 や HDFS にエクスポートする、データを DynamoDB にインポートする、テーブルを結合する、テーブルにクエリを行うなどの操作を行います。

Hive テーブルに対する操作では、DynamoDB に格納されているデータを参照します。Hive コマンドは、DynamoDB テーブルに設定されたスループット設定の制約を受けます。また、取得されるデータには、Hive 操作リクエストが DynamoDB で処理された時点で DynamoDB テーブルに書き込まれているデータが含まれます。データ取得プロセスに時間がかかる場合、Hive コマンドによって返されたデータには、Hive コマンドの開始後に DynamoDB で更新されたものが含まれる可能性があります。

Hive コマンドの `DROP TABLE` と `CREATE TABLE` は、Hive のローカルテーブルにのみ作用し、DynamoDB のテーブルの作成または削除は行いません。Hive クエリが DynamoDB のテーブルを参照する場合、そのテーブルは、クエリを実行する前に存在している必要があります。DynamoDB でのテーブルの作成と削除の詳細については、「*Amazon DynamoDB デベロッパーガイド*」の「[DynamoDB のテーブルの操作](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html)」を参照してください。

**注記**  
 Hive テーブルを Amazon S3 内の場所にマッピングする場合、バケットのルートパス (s3://amzn-s3-demo-bucket) にはマッピングしないでください。これを行うと、Hive がデータを Amazon S3 に書き込むときにエラーが発生することがあります。代わりに、テーブルをバケットのサブパス（s3://amzn-s3-demo-bucket/mypath）にマッピングします。

## DynamoDB からデータをエクスポートする
<a name="EMR_Hive_Commands_exporting"></a>

 Hive を使用して DynamoDB のデータをエクスポートすることができます。

**DynamoDB テーブルを Amazon S3 バケットにエクスポートするには**
+  DynamoDB に格納されたデータを参照する Hive テーブルを作成します。次に、INSERT OVERWRITE コマンドを呼び出して、データを外部ディレクトリに書き込みます。次の例では、*s3://amzn-s3-demo-bucket/path/subpath/* は Amazon S3. 内の有効なパスです。CREATE コマンドで列とデータ型が DynamoDB 内の値と一致するように調整します。これを使用して DynamoDB データのアーカイブを Amazon S3 に作成できます。

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**書式設定を使用して DynamoDB テーブルを Amazon S3 バケットにエクスポートするには**
+  Amazon S3 内の場所を参照する外部テーブルを作成します。これは次の例では s3\$1export です。CREATE の呼び出しで、テーブルの行の書式を指定します。次に、INSERT OVERWRITE を使用して DynamoDB のデータを s3\$1export にエクスポートすると、データは指定の書式で書き込まれます。次の例では、データはカンマ区切り値（CSV）形式で書き込まれます。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**列のマッピングを指定せずに DynamoDB テーブルを Amazon S3 バケットにエクスポートするには**
+  DynamoDB に格納されたデータを参照する Hive テーブルを作成します。これは、カラムマッピングを指定しないこと以外、前の例とほぼ同じです。このテーブルには `map<string, string>` 型の列が 1 つだけ含まれている必要があります。次に、Amazon S3 に `EXTERNAL` テーブルを作成したら、`INSERT OVERWRITE` コマンドを呼び出して DynamoDB のデータを Amazon S3 に書き込みます。これを使用して DynamoDB データのアーカイブを Amazon S3 に作成できます。カラムマッピングがないので、この方法でエクスポートされたテーブルをクエリすることはできません。カラムマッピングを指定しないデータのエクスポートは、Amazon EMR AMI 2.2.*x* 以降でサポートされる Hive 0.8.1.5 以降で利用できます。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**データ圧縮を使用して DynamoDB テーブルを Amazon S3 バケットにエクスポートするには**
+  Hive では、Hive セッション中に設定できる圧縮コーデックが複数あります。これを行うことで、エクスポートデータは、指定した形式で圧縮されます。次の例では、LZO（Lempel-Ziv-Oberhumer）アルゴリズムを使用して、エクスポートされたファイルを圧縮します。

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   以下の圧縮コーデックを利用できます。
  +  org.apache.hadoop.io.compress.GzipCodec 
  +  org.apache.hadoop.io.compress.DefaultCodec 
  +  com.hadoop.compression.lzo.LzoCodec 
  +  com.hadoop.compression.lzo.LzopCodec 
  +  org.apache.hadoop.io.compress.BZip2Codec 
  +  org.apache.hadoop.io.compress.SnappyCodec 

**DynamoDB テーブルを HDFS にエクスポートするには**
+  次の Hive コマンドを使用します。*hdfs:///directoryName* は有効な HDFS パスであり、*hiveTableName* は Hive のテーブルで、DynamoDB を参照しています。Hive 0.7.1.1 はデータを Amazon S3 にエクスポートする際に HDFS を中間ステップとして使用するので、このエクスポート操作は、DynamoDB テーブルを Amazon S3 にエクスポートするよりも高速です。また、次の例は、`dynamodb.throughput.read.percent` を 1.0 に設定して読み取りリクエストレートを上げる方法を示しています。

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   Amazon S3 へのエクスポートで説明したように、書式設定や圧縮を使用して HDFS にデータをエクスポートすることもできます。これを行うには、上記の例の Amazon S3 ディレクトリを HDFS ディレクトリに置き換えるだけです。<a name="EMR_Hive_non-printable-utf8"></a>

**印刷不可の UTF-8 文字データを Hive で読み取るには**
+ テーブル作成時に `STORED AS SEQUENCEFILE` 句を使用すると、印刷不可の UTF-8 文字データを Hive で読み取ることができます。SequenceFile は Hadoop バイナリファイル形式です。このファイルを読み取るには Hadoop を使用する必要があります。次の例に、DynamoDB から Amazon S3 にデータをエクスポートする方法を示します。この機能を使用して印刷不可の UTF-8 でエンコードされた文字を処理できます。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## DynamoDB へのデータのアップロード
<a name="EMR_Hive_Commands_importing"></a>

 Hive を使用してデータを DynamoDB に書き込む場合は、書き込み容量単位の数をクラスター内のマッパーの数より大きいことを確認する必要があります。たとえば、m1.xlarge EC2 インスタンスで実行されるクラスターは、インスタンスごとに 8 個のマッパーを作成します。10 個のインスタンスがあるクラスターの場合は、合計 80 個のマッパーがあることになります。書き込み容量単位がクラスター内のマッパーの数以下である場合、Hive の書き込み操作は書き込みスループットを消費しきるか、プロビジョニングされた以上のスループットを消費しようとします。各 EC2 インスタンスタイプによって生成されるマッパーの数の詳細については、[Hadoop の設定](emr-hadoop-config.md) を参照してください。

 Hadoop 内のマッパーの数は入力分割数によって決まります。分割数が少なすぎる場合、書き込みコマンドは書き込みスループットを消費しきれない可能性があります。

 同じキーを持つ項目がターゲット DynamoDB テーブルに存在する場合、項目は上書きされます。キーを持つ項目がターゲット DynamoDB テーブルに存在しない場合、その項目は挿入されます。

**Amazon S3 のテーブルを DynamoDB にインポートするには**
+  Amazon EMR (Amazon EMR) と Hive を使用して、Amazon S3 から DynamoDB にデータを書き込むことができます。

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**カラムマッピングを指定せずに Amazon S3 バケットのテーブルを DynamoDB にインポートするには**
+  以前 DynamoDB からエクスポートされ Amazon S3 に格納されたデータを参照する `EXTERNAL` テーブルを作成します。インポートする前に、テーブルが DynamoDB に存在することと、そのキースキーマが、以前エクスポートされた DynamoDB テーブルと同じであることを確認します。また、テーブルには `map<string, string>` 方の列が 1 つだけ含まれる必要があります。次に、DynamoDB にリンクされている Hive テーブルを作成したら、`INSERT OVERWRITE` コマンドを呼び出して Amazon S3 のデータを DynamoDB に書き込みます。カラムマッピングがないので、この方法でインポートされたテーブルにクエリを行うことはできません。カラムマッピングを指定しないデータのインポートは、Amazon EMR AMI 2.2.3 以降でサポートされる Hive 0.8.1.5 以降で利用できます。

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**HDFS のテーブルを DynamoDB にインポートするには**
+  Amazon EMR と Hive を使用して HDFS のデータを DynamoDB にインポートすることができます。

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## DynamoDB 内データのクエリ
<a name="EMR_Hive_Commands_querying"></a>

 次の例は、Amazon EMR を使用して DynamoDB のデータにクエリを行うさまざまな方法を示します。

**マッピングされたカラムで最大値を検索するには（`max`）**
+  次のような Hive コマンドを使用します。最初のコマンドの CREATE ステートメントで、DynamoDB に格納されたデータを参照する Hive テーブルを作成します。次に、SELECT ステートメントで、そのテーブルを使用して、DynamoDB に格納されたデータのクエリを行います。次の例では、指定された顧客による最大の注文を検索します。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**`GROUP BY` 句を使用してデータを集計するには**
+  `GROUP BY` 句を使用して、複数のレコードのデータを収集できます。多くの場合、これは sum、count、min、または max のような集計関数とともに使用されます。次の例は、4 件以上注文した顧客の注文のうち最大の注文のリストを返します。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**2 つの DynamoDB テーブルを結合するには**
+  次の例では、2 つの Hive テーブルを DynamoDB に格納されているデータにマッピングします。その後、2 つのテーブルに対して join を呼び出します。結合はクラスターで計算され、以下を返します。結合は DynamoDB 内では発生しません。この例は、3 件以上注文した顧客について、顧客とその購入品のリストを返します。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**異なるソースの 2 つのテーブルを結合するには**
+  次の例の Customer\$1S3 は、Amazon S3 に格納された CSV ファイルを読み込む Hive テーブルであり、hive\$1purchases は、DynamoDB のデータを参照するテーブルです。次の例では、Amazon S3 に CSV ファイルとして格納されている顧客データと、DynamoDB に格納されている注文データを結合し、名前に「Miller」が含まれる顧客による注文を表すデータのセットを返します。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**注記**  
 前の各例には、明確かつ完全にするために CREATE TABLE 句が含まれていました。指定された Hive テーブルに対して複数のクエリやエクスポート操作を実行する場合は、Hive セッションの開始時にテーブルを一度作成するだけで済みます。

# DynamoDB での Amazon EMR オペレーションのパフォーマンスの最適化
<a name="EMR_Hive_Optimizing"></a>

 DynamoDB テーブルでの Amazon EMR の操作は読み込み操作としてカウントされ、テーブルのプロビジョニング済みスループットの設定が適用されます。Amazon EMR は独自のロジックを実装して、DynamoDB テーブルの負荷を分散させ、プロビジョニング済みスループットを超過する可能性を最小限に抑えようとします。Amazon EMR はそれぞれの Hive クエリの最後に、プロビジョニング済みスループットを超過した回数など、クエリの処理に使用されたクラスターに関する情報を返します。この情報とともに、DynamoDB スループットに関する CloudWatch メトリクスを使用すると、以降のリクエストで DynamoDB テーブルの負荷をより適切に管理することができます。

 DynamoDB テーブルを操作する際に Hive クエリのパフォーマンスに影響を与える要因を次に示します。

## プロビジョニングされた読み込み容量単位
<a name="ProvisionedReadCapacityUnits"></a>

 DynamoDB テーブルに対して Hive クエリを実行する際には、十分な量の読み込み容量単位をプロビジョニングしておく必要があります。

 たとえば、DynamoDB テーブルに対して 100 ユニットの読み込みキャパシティーをプロビジョニングしているとします。この場合、1 秒間に 100 の読み込み（409,600 バイト）を実行できます。そのテーブルに 20 GB（21,474,836,480 バイト）のデータが含まれており、Hive クエリがフルテーブルスキャンを実行する場合、クエリの実行にかかる時間は次のように見積もられます。

 * 21,474,836,480 / 409,600 = 52,429 秒 = 14.56 時間* 

 必要な時間を短縮するには、ソース DynamoDB テーブルで読み込みキャパシティーユニットを調整する以外に方法はありません。Amazon EMR クラスターにノードをさらに追加しても、時間は短縮されません。

 Hive 出力では、1 つ以上のマッパープロセスが終了すると、完了のパーセンテージが更新されます。プロビジョニングされた読み込みキャパシティーが小さく設定された大きな DynamoDB テーブルでは、完了のパーセンテージ出力が長時間更新されない場合があります。上記のような場合、ジョブは数時間にわたって 0% 完了として表示されます。ジョブの進行状況の詳細なステータスについては、Amazon EMR コンソールに移動してください。ここで、個別のマッパータスクのステータスおよびデータ読み込みの統計を表示できます。

 また、マスターノードの Hadoop インターフェイスにログオンし、Hadoop 統計を表示することもできます。ここには、個別のマップタスクのステータスおよびいくつかのデータ読み込み統計が表示されます。詳細については、「*Amazon EMR 管理ガイド*」の「[マスターノードでホストされているウェブインターフェイス](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)」を参照してください。

## 読み込みパーセントの設定
<a name="ReadPercent"></a>

 デフォルトでは、Amazon EMR は現在のプロビジョニング済みスループットに基づいて、DynamoDB テーブルに対するリクエスト負荷を管理します。ただし、プロビジョニング済みスループットを超過した応答を多数含むジョブに関する情報を Amazon EMR が返す場合は、Hive テーブルを設定する際に、`dynamodb.throughput.read.percent` パラメータを使用してデフォルトの読み込みレートを調整することができます。読み込みパーセントパラメータの設定の詳細については、「[Hive のオプション](EMR_Interactive_Hive.md#EMR_Hive_Options)」を参照してください。

## 書き込みパーセントの設定
<a name="WritePercent"></a>

 デフォルトでは、Amazon EMR は現在のプロビジョニング済みスループットに基づいて、DynamoDB テーブルに対するリクエスト負荷を管理します。ただし、プロビジョニング済みスループットを超過した応答を多数含むジョブに関する情報を Amazon EMR が返す場合は、Hive テーブルを設定する際に、`dynamodb.throughput.write.percent` パラメータを使用してデフォルトの書き込みレートを調整することができます。書き込みパーセントパラメータの設定の詳細については、「[Hive のオプション](EMR_Interactive_Hive.md#EMR_Hive_Options)」を参照してください。

## 再試行間隔の設定
<a name="emr-ddb-retry-duration"></a>

 デフォルトでは、Hive クエリが 2 分以内 (デフォルトの再試行間隔) に結果を返さない場合、Amazon EMR はこのクエリを再実行します。この間隔は、Hive クエリの実行時に `dynamodb.retry.duration` パラメータを設定することによって調整できます。書き込みパーセントパラメータの設定の詳細については、「[Hive のオプション](EMR_Interactive_Hive.md#EMR_Hive_Options)」を参照してください。

## マップタスクの数
<a name="NumberMapTasks"></a>

 DynamoDB に格納されているデータのエクスポートおよびクエリ実行のリクエストを処理するために Hadoop が起動するマッパーデーモンは、最大読み込みレート (毎秒 1 MiB) を上限値として、使用される読み込みキャパシティーを制限します。DynamoDB で追加のプロビジョニング済みスループットを使用できる場合は、マッパーデーモンの数を増やせば、Hive エクスポートおよびクエリオペレーションのパフォーマンスを向上させることができます。そのためには、クラスター内の EC2 インスタンスの数を増やすか、*または*各 EC2 インスタンスで実行されているマッパーデーモンの数を増やします。

 クラスター内の EC2 インスタンスの数を増やすには、現在のクラスターを停止し、より多くの EC2 インスタンスとともに再起動します。Amazon EMRコンソールからクラスターを起動する場合は、**[Configure EC2 Instances]** (EC2 インスタンスの設定) ダイアログボックスで EC2 インスタンスの数を指定します。または、CLI からクラスターを起動する場合は `‑‑num-instances` オプションを使用します。

 インスタンスで実行されるマップタスクの数は、EC2 インスタンスタイプによって異なります。サポートされる EC2 インスタンスタイプ、および各タイプで提供されるマッパーの数の詳細については、[タスクの設定](emr-hadoop-task-config.md) を参照してください。そこには、サポートされる設定ごとに "タスクの設定" セクションがあります。

 マッパーデーモンの数を増やすもう 1 つの方法としては、Hadoop の `mapreduce.tasktracker.map.tasks.maximum` 設定パラメータをより大きい値に変更します。この場合は、EC2 インスタンスの数またはサイズを大きくせずにマッパーの数を増やすことができるので、コストを削減できるという利点があります。欠点としては、この値を大きく設定しすぎると、クラスター内の EC2 インスタンスがメモリ不足になる可能性があります。`mapreduce.tasktracker.map.tasks.maximum` を設定するには、クラスターを起動し、`mapreduce.tasktracker.map.tasks.maximum` の値を mapred-site 設定分類のプロパティとして指定します。以下の例ではこれを示しています。詳細については、「[アプリケーションの設定](emr-configure-apps.md)」を参照してください。

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## 並列データリクエスト
<a name="ParallelDataRequests"></a>

 複数のユーザーまたは複数のアプリケーションから単一のテーブルに複数のデータリクエストが行われると、プロビジョニング済み読み込みスループットが減少し、パフォーマンス速度が低下します。

## 処理間隔
<a name="ProcessDuration"></a>

 DynamoDB におけるデータの整合性は、各ノードで実行される読み込み/書き込みオペレーションの順序に影響を受けます。Hive クエリの進行中に、別のアプリケーションが DynamoDB テーブルに新しいデータをロードしたり、既存のデータの変更や削除を行ったりする場合があります。この場合、クエリの実行中にデータに対して行われた変更は Hive クエリの結果に反映されないことがあります。

## スループットの超過の回避
<a name="AvoidExceedingThroughput"></a>

 DynamoDB に対して Hive クエリを実行する際には、プロビジョニング済みスループットを超過しないように注意してください。超過すると、アプリケーションによる `DynamoDB::Get` の呼び出しに必要な容量が不足してしまいます。この状態が発生しないようにするには、Amazon CloudWatch でログの確認およびメトリクスのモニタリングを行うことで、アプリケーションによる `DynamoDB::Get` の呼び出し時に読み込み量とスロットリングを定期的にモニタリングする必要があります。

## [Request time] (リクエストタイム)
<a name="RequestTime"></a>

 DynamoDB テーブルの需要が低いタイミングでテーブルにアクセスするように、Hive クエリをスケジューリングすると、パフォーマンスを向上させられます。たとえば、アプリケーションのほとんどのユーザーがサンフランシスコに住んでいる場合、大部分のユーザーが睡眠中で DynamoDB データベースを更新していない毎朝 4 時 (PST) にデータをエクスポートするように選択することができます。

## 時間ベースのテーブル
<a name="TimeBasedTables"></a>

 データが一連の時間ベースの DynamoDB テーブル (たとえば、1 日あたり 1 つのテーブル) として構成されている場合は、テーブルが非アクティブになったときにデータをエクスポートできます。この手法を使用すると、データを Amazon S3 に継続的にバックアップできます。

## アーカイブされたデータ
<a name="ArchivedData"></a>

 DynamoDB に格納されているデータに対して多数の Hive クエリを実行する予定であり、アーカイブされたデータがアプリケーションで許容される場合は、データを HDFS または Amazon S3 にエクスポートし、DynamoDB の代わりにデータのコピーに対して Hive クエリを実行することができます。これにより、読み込みオペレーションおよびプロビジョニング済みスループットを過度に使用せずに済みます。

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR クラスターは、Hive、Pig、MapReduce、Hadoop Streaming API、Cascading などの Hadoop エコシステムで使い慣れたツールを使用して、Amazon Kinesis Streams を直接読み込み、処理することができます。また、実行しているクラスターで Amazon Kinesis のリアルタイムデータを Amazon S3、Amazon DynamoDB、および HDFS の既存データと結合できます。後処理のアクティビティとして、Amazon EMR から Amazon S3 または DynamoDB に直接データをロードできます。Amazon Kinesis の主なサービスと料金については、「[Amazon Kinesis](https://aws.amazon.com//kinesis)」ページを参照してください。

## Amazon EMR と Amazon Kinesis の統合で何ができますか?
<a name="kinesis-use-cases"></a>

 Amazon EMR と Amazon Kinesis の統合により、以下のようなシナリオへの対応が非常に容易になります。
+ **ストリーミングログ分析** – ストリーミングのウェブログを分析して、リージョン別、ブラウザ別、およびアクセスドメイン別に、数分ごとの上位 10 件のエラータイプのリストを生成できます。
+ **カスタマーエンゲージメント** – Amazon Kinesis のクリックストリームデータと DynamoDB テーブルに保存されている広告キャンペーン情報を結合するクエリを作成し、特定のウェブサイトに表示される最も効果的な広告カテゴリを特定できます。
+ **アドホックインタラクティブクエリ** – Amazon Kinesis Streams から HDFS に定期的にデータを読み込み、ローカルの Impala テーブルとして使用可能にすることで、高速かつインタラクティブな分析クエリを実行できます。

## Amazon Kinesis Streams のチェックポイントの分析
<a name="kinesis-checkpoint"></a>

ユーザーは、Amazon Kinesis Streams の定期的なバッチ分析をいわゆる*反復*で実行できます。Amazon Kinesis Streams のデータレコードはシーケンス番号を使用して取得されるため、反復の境界は Amazon EMR によって DynamoDB テーブルに格納される開始と終了のシーケンス番号で定義されます。たとえば、`iteration0` が終了すると、終了のシーケンス番号が DynamoDB に格納されるため、`iteration1` ジョブが開始されたとき、ストリームからそれに続くデータを取得できます。このストリームデータの反復のマッピングは*チェックポイント*と呼ばれます。詳細については、「[Kinesis コネクター](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)」を参照してください。

反復にチェックポイントが設定された後に、ジョブの反復処理が失敗した場合、Amazon EMR では、その反復のレコード処理が再試行されます。

チェックポイントは、次のことが可能になる機能です。
+ 同じストリームと論理名で実行した前のクエリにより処理された連続番号の後でデータ処理を開始します
+ 前のクエリで処理された Kinesis のデータと同じバッチを再処理します

 チェックポイントを有効にするには、スクリプトで `kinesis.checkpoint.enabled` パラメータを `true` に設定します。また、以下のパラメータを設定します。


| 構成設定 | 説明 | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | チェックポイント情報が保存される DynamoDB テーブル名 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB テーブルのハッシュキー名 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB テーブルの範囲キー名 | 
| kinesis.checkpoint.logical.name | 現在の処理の論理名 | 
| kinesis.checkpoint.iteration.no | 論理名に関連付けられている処理の反復番号 | 
| kinesis.rerun.iteration.without.wait | 失敗した反復がタイムアウトを待たずに再実行できるかどうかを示すブール値（デフォルトは false）。 | 

### Amazon DynamoDB テーブルのプロビジョニングされた IOPS に関する推奨事項
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis 用の Amazon EMR コネクターは、チェックポイントのメタデータの補助として DynamoDB データベースを使用します。Amazon Kinesis Streams のデータをチェックポイントの間隔で Amazon EMR クラスターで使用する前に、DynamoDB のテーブルを作成する必要があります。テーブルは Amazon EMR クラスターと同じリージョンに存在する必要があります。以下は、DynamoDB テーブルにプロビジョニングする必要がある IOPS の数に関する一般的な推奨です。`j` は同時に実行できる Hadoop ジョブ (異なる論理名 \$1 反復数の組み合わせ) の最大数で、`s` はジョブが処理するシャードの最大数です。

**読み込みキャパシティーユニット**: `j`\$1`s`/`5`

**書き込みキャパシティーユニット**: `j`\$1`s`

## パフォーマンスに関する考慮事項
<a name="performance"></a>

Amazon Kinesis シャードスループットは、Amazon EMR クラスターのノードのインスタンスのサイズ、およびストリームのレコードのサイズに正比例しています。マスターノードやコアノードで m5.xlarge かそれ以上のインスタンスを使用することをお勧めします。

## Amazon EMR で Amazon Kinesis 分析をスケジュールする
<a name="schedule"></a>

任意の繰り返しについてタイムアウトと最大期間で制限される、アクティブな Amazon Kinesis Streams でデータを分析するときは、ストリームから定期的に詳細を取得するために、分析を頻繁に実行することが重要です。定期的な間隔でこのようなスクリプトおよびクエリを実行する方法は複数ありますが、これらのような反復タスクには、 AWS Data Pipeline を使用することをお勧めします。詳細については、「*AWS Data Pipeline 開発者ガイド*」の「[AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)」および「[AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)」を参照してください。

# Amazon EMR 7.0 用スパークキネシスコネクターの SDK 2.x への移行
<a name="migrating-spark-kinesis"></a>

 AWS SDK は、認証情報の管理、S3 および Kinesis サービスへの接続など、クラウドコンピューティングサービスとやり取り AWS するための豊富な APIs とライブラリのセットを提供します。Spark Kinesis コネクターは Kinesis データストリームからデータを消費するために使用され、受信したデータは Spark の実行エンジンで変換および処理されます。現在、このコネクタは 1.x の AWS SDK と Kinesis-client-library (KCL) 上に構築されています。

 AWS SDK 2.x 移行の一環として、Spark Kinesis コネクタも SDK 2.x で実行されるように更新されます。Amazon EMR 7.0 リリースの Spark には、コミュニティバージョンの Apache Spark ではまだ利用できない SDK 2.x アップグレードが含まれています。7.0 より前のリリースの Spark Kinesis コネクタを使用する場合、Amazon EMR 7.0 に移行する前に、アプリケーションコードを SDK 2.x で実行するように移行する必要があります。

## 移行ガイド
<a name="migrating-spark-kinesis-migration-guides"></a>

このセクションでは、アップグレードされた Spark Kinesis コネクタにアプリケーションを移行する手順について説明します。これには、 AWS SDK 2.x の Kinesis Client Library (KCL) 2.x、 AWS 認証情報プロバイダー、および AWS サービスクライアントに移行するためのガイドが含まれています。参考までに、Kinesis コネクタを使用するサンプル [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) プログラムも含まれています。

**Topics**
+ [1.x から 2.x への移行](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [AWS SDK 1.x から 2.x への AWS 認証情報プロバイダーの移行](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [AWS SDK 1.x から 2.x への AWS サービスクライアントの移行](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [ストリーミングアプリケーションのコード例](#migrating-spark-kinesis-streaming-examples)
+ [アップグレードされた Spark Kinesis コネクターを使用する際の考慮事項](#migrating-spark-kinesis-considerations)

### 1.x から 2.x への移行
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream` のメトリクスレベルとディメンション**

  `KinesisInputDStream` をインスタンス化すると、ストリームのメトリクスレベルとディメンションを制御できます。以下の例は、KCL 1.x でこれらのパラメータをカスタマイズする方法を示しています。

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  KCL 2.x では、これらの構成設定のパッケージ名は異なります。2.x に移行するには:

  1. `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` および `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` のインポートステートメントをそれぞれ `software.amazon.kinesis.metrics.MetricsLevel` および `software.amazon.kinesis.metrics.MetricsUtil` に変更します。

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. 行 `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` を `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)` に置き換えます。

  以下は、カスタマイズされたメトリクスレベルとメトリクスディメンションを備えた `KinesisInputDStream` の更新バージョンです。

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream` のメッセージハンドラー関数

  `KinesisInputDStream` をインスタンス化するときに、パーティションキーなどの Record に含まれる他のデータを使用したい場合に備えて、Kinesis Record を取得して汎用オブジェクト T を返す「メッセージハンドラー関数」を提供することもできます。

  KCL 1.x では、メッセージハンドラー関数のシグネチャは `Record => T` であり、レコードは `com.amazonaws.services.kinesis.model.Record` です。KCL 2.x では、ハンドラーの署名は `KinesisClientRecord => T` に変更され、KinesisClientRecord は `software.amazon.kinesis.retrieval.KinesisClientRecord` です。

  KCL 1.x でメッセージハンドラーを提供する例を以下に示します。

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  メッセージハンドラーを移行するには:

  1. `com.amazonaws.services.kinesis.model.Record` のインポートステートメントを `software.amazon.kinesis.retrieval.KinesisClientRecord` に変更します。

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. メッセージハンドラーのメソッドシグネチャを更新します。

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  KCL 2.x でメッセージハンドラーを提供する、更新済みの例を以下に示します。

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  KCL 1.x から 2.x への移行について詳しくは、「[KCL 1.x から KCL 2.x へのコンシューマの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)」を参照してください。

### AWS SDK 1.x から 2.x への AWS 認証情報プロバイダーの移行
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

認証情報プロバイダーは、 とのやり取りの AWS 認証情報を取得するために使用されます AWS。SDK 2.x の認証情報プロバイダーに関連するインターフェイスとクラスの変更がいくつかあります。詳しくは、[こちら](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)をご覧ください。Spark Kinesis コネクタは、1.x バージョンの AWS 認証情報プロバイダーを返すインターフェイス (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) と実装クラスを定義しています。これらの認証情報プロバイダーは、Kinesis クライアントを初期化する際に必要です。たとえば、`SparkAWSCredentials.provider`アプリケーションで メソッドを使用している場合は、2.x バージョンの AWS 認証情報プロバイダーを使用するようにコードを更新する必要があります。

 AWS SDK 1.x で認証情報プロバイダーを使用する例を次に示します。

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**SDK 2.x に移行するには:**

1. `com.amazonaws.auth.AWSCredentialsProvider` のインポートステートメントを `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider` に変更します。

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. このクラスを使用する残りのコードを更新します。

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### AWS SDK 1.x から 2.x への AWS サービスクライアントの移行
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS サービスクライアントは 2.x で異なるパッケージ名 ( など`software.amazon.awssdk`) を持ちますが、SDK 1.x は を使用します`com.amazonaws`。クライアント変更に関する詳細については、[こちら](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)を参照してください。コードでこれらのサービスクライアントを使用している場合は、それに応じてクライアントを移行する必要があります。

SDK 1.x でクライアントを作成する例を以下に示します。

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**2.x に移行するには:**

1. サービスクライアントのインポートステートメントを変更します。DynamoDB クライアントを例にとってみましょう。`com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` または `com.amazonaws.services.dynamodbv2.document.DynamoDB` を `software.amazon.awssdk.services.dynamodb.DynamoDbClient` に変更する必要があります。

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. クライアントを初期化するコードを更新してください

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

    AWS SDK を 1.x から 2.x に移行する方法の詳細については、[AWS 「 SDK for Java 1.x と 2.x の違い」を参照してください。](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### ストリーミングアプリケーションのコード例
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### アップグレードされた Spark Kinesis コネクターを使用する際の考慮事項
<a name="migrating-spark-kinesis-considerations"></a>
+ アプリケーションが 11 より前のバージョンの JDK の `Kinesis-producer-library` を使用している場合、`java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter` のような例外が発生する可能性があります。これは、EMR 7.0 にはデフォルトで JDK 17 が付属しており、J2EE モジュールは Java 11 以降で標準ライブラリから削除されているためです。これは pom ファイルに次の依存関係を追加することで修正できます。必要に応じてライブラリバージョンを適切なもので交換します。

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ EMR クラスターの作成後、Spark Kinesis コネクタ jar はパス `/usr/lib/spark/connector/lib/` にあります。

# S3DistCp (s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

Apache DistCp は、大量のデータをコピーするときに使用できるオープンソースツールです。*S3DistCp* は DistCp に似ていますが、特に Amazon S3 の AWSで動作するように最適化されています。Amazon EMR バージョン 4.0 以降での S3DistCp のコマンドは `s3-dist-cp` で、これをクラスターのステップまたはコマンドラインに追加します。S3DistCp を使用すると、Amazon S3 の大量のデータを HDFS に効率的にコピーできます。ここにコピーされたデータは、Amazon EMR クラスターの以降のステップで処理できます。また、S3DistCp を使用して、Amazon S3 バケット間で、または HDFS から Amazon S3 にデータをコピーすることもできます。S3DistCp は、バケット間および AWS アカウント間で多数のオブジェクトを並列コピーするために、よりスケーラブルで効率的です。

実際のシナリオでの S3DistCP の柔軟性を示す特定のコマンドについては、 AWS ビッグデータブログの[S3DistCp を使用するための 7 つのヒント](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/)」を参照してください。

DistCp と同様に、S3DistCp は MapReduce を使用して、分散された方法でコピーを実行します。複数のサーバー全体でのコピー、エラー処理、復旧、および報告のタスクが共有されます。Apache DistCp オープンソースプロジェクトの詳細については、Apache Hadoop ドキュメントの「[DistCp ガイド](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html)」を参照してください。

指定したファイルの一部またはすべてを S3DistCp がコピーできない場合、クラスターステップは失敗し、ゼロ以外のエラーコードを返します。この場合、S3DistCp では、コピーされたファイルの一部がクリーンアップされません。

**重要**  
S3DistCp では、アンダースコア文字を含む Amazon S3 バケット名はサポートされません。  
S3DistCp は、Parquet 形式ファイルの連結をサポートしていません。代わりに PySpark を使用します。詳細については、「[Amazon EMR での Parquet ファイルの連結](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/)」を参照してください。  
S3DistCP を使用して (ディレクトリではなく) 1 つのファイルを S3 から HDFS にコピーするときにコピーエラーを回避するには、Amazon EMR バージョン 5.33.0 以降、または Amazon EMR 6.3.0 以降を使用してください。

## S3DistCp のオプション
<a name="UsingEMR_s3distcp.options"></a>

S3DistCp は DistCp と似ていますが、データのコピーと圧縮の方法を変更するためのさまざまな一連のオプションをサポートしています。

S3DistCp を呼び出す際に、次の表で示されているオプションを指定できます。オプションは、引数リストを使用してステップに追加されます。S3DistCp の引数の例を次の表に示します。


| オプション  | 説明  | 必須  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  コピーするデータのロケーション。HDFS または Amazon S3 ロケーションのいずれかを指定できます。 例: `‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`   S3DistCp では、アンダースコア文字を含む Amazon S3 バケット名はサポートされません。   | はい  | 
| ‑‑dest=LOCATION  |  データのコピー先。HDFS または Amazon S3 ロケーションのいずれかを指定できます。 例: `‑‑dest=hdfs:///output`   S3DistCp では、アンダースコア文字を含む Amazon S3 バケット名はサポートされません。   | はい  | 
| ‑‑srcPattern=PATTERN  |  `‑‑src` でのデータのサブセットへのコピー操作をフィルタリングする[正規表現](http://en.wikipedia.org/wiki/Regular_expression)。`‑‑srcPattern` と `‑‑groupBy` がどちらも指定されていない場合は、`‑‑src` のすべてのデータが `‑‑dest` にコピーされます。 正規表現の引数にアスタリスク（\$1）などの特殊文字が含まれる場合は、正規表現または `‑‑args` 文字列全体のいずれかを引用符（'）で囲む必要があります。 例: `‑‑srcPattern=.*daemons.*-hadoop-.*`   | いいえ  | 
| ‑‑groupBy=PATTERN  |  複数のファイルを統合する[正規表現](http://en.wikipedia.org/wiki/Regular_expression)。この表現に一致するファイルが S3DistCp によって統合されます。たとえば、このオプションを使用すると、1 時間で書き込まれたすべてのログファイルを 1 つのファイルに統合できます。統合されたファイル名は、グループ化のためのこの正規表現と一致する値になります。 括弧は、ファイルをグループ化する方法を示しています。括弧で囲まれたステートメントと一致するすべての項目が 1 つの出力ファイルに統合されます。正規表現に括弧で囲まれたステートメントが含まれない場合、クラスターは S3DistCp ステップで失敗し、エラーを返します。 正規表現の引数にアスタリスク（\$1）などの特殊文字が含まれる場合は、正規表現または `‑‑args` 文字列全体のいずれかを引用符（'）で囲む必要があります。 `‑‑groupBy` が指定されている場合は、指定されたパターンと一致するファイルのみがコピーされます。`‑‑groupBy` と `‑‑srcPattern` は同時に指定する必要はありません。 例: `‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | いいえ  | 
| ‑‑targetSize=SIZE  |  `‑‑groupBy` オプションに基づいて作成するファイルのサイズ（メビバイト（Mib）単位）。整数の値である必要があります。`‑‑targetSize` が設定されている場合、S3DistCp はファイルのサイズをこの値に合わせようとしますが、コピーされたファイルの実際のサイズは、この値より大きかったり小さかったりする場合があります。ジョブはデータファイルのサイズに基づいて集計されるため、ターゲットファイルサイズがソースデータファイルのサイズに一致する場合があります。 `‑‑groupBy` によって統合されたファイルが `‑‑targetSize` の値よりも大きい場合、そのファイルはパートファイルに分割され、それぞれのファイル名の末尾に数値が順番に付けられます。たとえば、`myfile.gz` に統合されたファイルは、`myfile0.gz`、`myfile1.gz` などにパートに分割されます。 例: `‑‑targetSize=2`   | いいえ  | 
| ‑‑appendToLastFile |  Amazon S3 から HDFS へ既存するファイルをコピーする際の S3DistCp の動作を指定します。既存のファイルに新しいファイルのデータが追加されます。`‑‑appendToLastFile` を指定して `‑‑groupBy` を使用すると、新しいデータは同じグループのファイルに追加されます。このオプションは、`‑‑targetSize` を `‑‑groupBy.` を指定して使用した動作にも順守しています。  | いいえ  | 
| ‑‑outputCodec=CODEC  |  コピーされたファイルで使用する圧縮コーデックを指定します。値としては `gzip`、`gz`、`lzo`、`snappy`、または `none` をとることができます。このオプションを使用すると、たとえば、Gzip で圧縮された入力ファイルを LZO 圧縮の出力ファイルに変換したり、コピーオペレーションの一環としてファイルを解凍したりできます。出力コーデックを選択した場合、ファイル名には適切な拡張子（`gz` や `gzip` など。拡張子は `.gz`）が付加されます。`‑‑outputCodec` の値を指定しない場合、圧縮に変更が加えられずにファイルがコピーされます。 例: `‑‑outputCodec=lzo`   | いいえ  | 
| ‑‑s3ServerSideEncryption  |  ターゲットデータが SSL を使用して転送され、 AWS サービス側のキーを使用して Amazon S3 で自動的に暗号化されるようにします。S3DistCp を使用してデータを取得するとき、オブジェクトは自動的に暗号化解除されます。暗号化されていないオブジェクトを、暗号化が必要な Amazon S3 バケットにコピーしようとすると、操作は失敗します。詳細については、「[データ暗号化の使用](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html)」を参照してください。 例: `‑‑s3ServerSideEncryption`   | いいえ  | 
| ‑‑deleteOnSuccess  |  コピーオペレーションが成功すると、このオプションにより、S3DistCp がコピーされたファイルをコピー元の場所を削除します。このオプションは、スケジュールされたタスクとして、ログファイルなどの出力ファイルを、あるロケーションから他のロケーションにコピーするときに役立ちます。これにより、同じファイルを 2 回コピーせずに済むようになります。 例: `‑‑deleteOnSuccess`   | いいえ  | 
| ‑‑disableMultipartUpload  |   マルチパートアップロードの使用を無効にします。 例: `‑‑disableMultipartUpload`   | いいえ  | 
| ‑‑multipartUploadChunkSize=SIZE  |  Amazon S3 マルチパートアップロードでの各パートサイズ (MiB 単位)。S3DistCp では、`multipartUploadChunkSize` より大きいデータをコピーする場合、マルチパートアップロードが使用されます。ジョブのパフォーマンスを向上させるには、各パートサイズを大きく設定します。デフォルトのサイズは 128 MiB です。 例: `‑‑multipartUploadChunkSize=1000`   | いいえ  | 
| ‑‑numberFiles  |  出力ファイル名の先頭に通し番号を付けます。`‑‑startingIndex` で別の値を指定しない限り、番号は 0 で開始されます。 例: `‑‑numberFiles`   | いいえ  | 
| ‑‑startingIndex=INDEX  |   `‑‑numberFiles` とともに使用され、通し番号の開始値を指定します。 例: `‑‑startingIndex=1`   | いいえ  | 
| ‑‑outputManifest=FILENAME  |  Gzip で圧縮されたテキストファイルを作成します。このテキストファイルには、S3DistCp によってコピーされたすべてのファイルのリストが含まれます。 例: `‑‑outputManifest=manifest-1.gz`   | いいえ  | 
| ‑‑previousManifest=PATH  |  `‑‑outputManifest` フラグを使用して S3DistCp への以前の呼び出し中に作成されたマニフェストファイルを読み込みます。この `‑‑previousManifest` フラグが設定されている場合、このマニフェストにリストされているファイルは S3DistCp のコピーオペレーションから除外されます。`‑‑outputManifest` と共に `‑‑previousManifest` が指定されている場合、新しいマニフェストファイルには、以前のマニフェストにリストされているファイルも表示されます。ただし、そのファイルはコピーされません。 例: `‑‑previousManifest=/usr/bin/manifest-1.gz`   | いいえ  | 
| ‑‑requirePreviousManifest |  以前の S3DistCp の呼び出し中に作成したマニフェストファイルが必要です。これが false に設定されている場合、以前のマニフェストが指定されていないときにエラーが生成されません。デフォルトは True です。  | いいえ  | 
| ‑‑copyFromManifest  |   `‑‑previousManifest` の逆を行い、S3DistCp において、指定したマニフェストファイルを、コピーから除外するファイルのリストではなく、コピーするファイルのリストとして使用します。 例: `‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`   | いいえ  | 
| ‑‑s3Endpoint=ENDPOINT |  ファイルをアップロードするときに使用する Amazon S3 エンドポイントを指定します。このオプションは、アップロード元とアップロード先の両方のエンドポイントを設定します。設定しない場合は、デフォルトのエンドポイント `s3.amazonaws.com` が使用されます。Amazon S3 エンドポイントの一覧は、「[リージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region)」を参照してください。 例: `‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`   | いいえ  | 
| ‑‑storageClass=CLASS |  送信先が Amazon S3 のときに使用するストレージクラス。有効な値は、STANDARD と REDUCED\$1REDUNDANCY です。このオプションを指定しない場合、S3DistCp はストレージクラスを維持しようとします。 例: `‑‑storageClass=STANDARD`  | いいえ  | 
| ‑‑srcPrefixesFile=PATH |  `src` プレフィックスのリスト (1 行ごとにプレフィックスが 1 つずつ) を含む Amazon S3 (s3://)、HDFS (hdfs:///)、またはローカルファイルシステム (file:/) 内のテキストファイル。 `srcPrefixesFile` が提供されている場合、S3DistCp は src パスをリストしません。代わりに、このファイルで指定されたすべてのプレフィックスをリストした結果を組み合わせたソースリストを生成します。これらのプレフィックスの代わりに、src パスに対する相対パスを使用して宛先パスが生成されます。`srcPattern` も指定された場合、入力をさらにフィルタリングするため、ソースプレフィックスを組み合わせたリスト結果に適用されます。`copyFromManifest` が使用された場合、マニフェストのオブジェクトがコピーされ、`srcPrefixesFile` が無視されます。 例: `‑‑srcPrefixesFile=PATH`  | いいえ  | 

上記のオプション以外にも、S3DistCp は[ツールインターフェイス](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html)を実装しているので、一般的なオプションもサポートしています。

## クラスターに S3DistCp をステップとして追加する
<a name="UsingEMR_s3distcp.step"></a>

S3DistCp を呼び出すには、これをステップとしてクラスターに追加します。ステップは、起動時にクラスターに追加することも、コンソール、CLI、または API を使用して実行中のクラスターに追加することもできます。以下の例では、実行中のクラスターへの S3DistCp ステップの追加を示します。クラスターへのステップの追加の詳細については、「*Amazon EMR 管理ガイド*」の「[クラスターへの作業の送信](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html)」を参照してください。

**を使用して実行中のクラスターに S3DistCp ステップを追加するには AWS CLI**

での Amazon EMR コマンドの使用の詳細については AWS CLI、[AWS CLI 「 コマンドリファレンス](https://docs.aws.amazon.com/cli/latest/reference/emr)」を参照してください。
+ S3DistCp を呼び出すステップをクラスターに追加するには、S3DistCp によるコピーオペレーションの実行方法を指定するパラメータを引数として渡します。

  次の例では、デーモンログを Amazon S3 から `hdfs:///output` にコピーします。次のコマンドについて説明します。
  + `‑‑cluster-id` でクラスターを指定します。
  + `Jar` は、S3DistCp JAR ファイルのロケーションです。command-runner.jar を使用してクラスターでコマンドを実行する方法の例については、「[カスタム JAR ステップを送信してスクリプトまたはコマンドを実行する](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples)」を参照してください。
  + `Args` は、S3DistCp に渡すオプション名/値ペアのカンマ区切りのリストです。使用可能なオプションの完全なリストについては、「[S3DistCp のオプション](#UsingEMR_s3distcp.options)」を参照してください。

  実行中のクラスターに S3DistCp コピーステップを追加するには、Amazon S3 に保存された JSON ファイルまたはローカルファイルシステムのファイル (この例では `myStep.json`) に次の内容を配置します。*j-3GYXXXXXX9IOK* をクラスター ID に置き換え、*amzn-s3-demo-bucket* を Amazon S3 バケット名に置き換えます。

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example Amazon S3 からのログファイルを HDFS へコピーする**  
この例では、実行中のクラスターにステップを追加して、Amazon S3 バケットに格納されたログファイルを HDFS にコピーする方法も示します。この例では、`‑‑srcPattern` オプションを使用して、デーモンログにコピーされたデータを制限しています。  
`‑‑srcPattern` オプションを使用してログファイルを Amazon S3 から HDFS にコピーするには、Amazon S3 に保存された JSON ファイルまたはローカルファイルシステムのファイル (この例では `myStep.json`) に次の内容を配置します。*j-3GYXXXXXX9IOK* をクラスター ID に置き換え、*amzn-s3-demo-bucket* を Amazon S3 バケット名に置き換えます。  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## S3DistCp ジョブが失敗した後のクリーンアップ
<a name="s3distcp-cleanup"></a>

指定したファイルの一部またはすべてを S3DistCp がコピーできない場合、コマンドまたはクラスターステップは失敗し、ゼロ以外のエラーコードを返します。この場合、S3DistCp では、コピーされたファイルの一部がクリーンアップされません。手動で削除する必要があります。

部分的にコピーされたファイルは、S3DistCp ジョブの一意の識別子が付けられて、サブディレクトリの HDFS `tmp` ディレクトリに保存されます。この ID はジョブの標準出力に含まれます。

たとえば、S3DistCp ジョブの ID が `4b1c37bb-91af-4391-aaf8-46a6067085a6` の場合、クラスターのマスターノードに接続し、以下のコマンドを実行して、ジョブに関連付けられた出力ファイルを表示できます。

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

このコマンドによって以下のようなファイルのリストが返されます。

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

その後、以下のコマンドを実行して、ディレクトリとそのすべての内容を削除できます。

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```