

# DynamoDB 接続　
<a name="aws-glue-programming-etl-connect-dynamodb-home"></a>

Spark 用の AWS Glue を使用して AWS Glue の DynamoDB 内のテーブルに対する読み込みと書き込みを行うことができます。AWS Glue ジョブにアタッチされている IAM 権限を使用して DynamoDB に接続します。AWS Glue は、別の AWS アカウントの DynamoDB テーブルに対するデータの書き込みをサポートしています。詳細については、「[DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス](aws-glue-programming-etl-dynamo-db-cross-account.md)」を参照してください。

元の DynamoDB コネクタは Glue DynamicFrame オブジェクトを使用して、DynamoDB から抽出されたデータを操作します。AWSGlue 5.0\$1 では、Spark DataFrame をネイティブにサポートする新しい [Spark DataFrame をサポートする DynamoDB コネクタ](aws-glue-programming-etl-connect-dynamodb-dataframe-support.md)が導入されています。

AWS Glue DynamoDB ETL コネクターに加えて、DynamoDB エクスポートコネクターを使用して DynamoDB から読み取ることができます。このコネクターは、DynamoDB `ExportTableToPointInTime` リクエストを呼び出し、これを指定した Amazon S3 の場所に [DynamoDB JSON](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.Output.html) 形式で保存します。次に、AWS Glue は、Amazon S3 のエクスポート場所からデータを読み取ることによって DynamicFrame オブジェクトを作成します。

DynamoDB のライターは、AWS Glue バージョン 1.0 以降で利用可能です。AWS Glue DynamoDB のエクスポートコネクターは、AWS Glue バージョン 2.0 以降で利用可能です。新しい DataFrame ベースの DynamoDB コネクタは、AWS Glue バージョン 5.0 以降で利用可能です。

DynamoDB の詳細については、[Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/) のドキュメントを参照してください。

**注記**  
DynamoDB ETL リーダーでは、フィルタまたはプッシュダウン述語は使用できません。

## DynamoDB 接続の設定
<a name="aws-glue-programming-etl-connect-dynamodb-configure"></a>

AWS Glue から DynamoDB に接続するには、AWS Glue ジョブに関連付けられた IAM ロールに DynamoDB と対話する権限を付与します。DynamoDB からの読み取りまたは書き込みに必要な権限の詳細については、「IAM ドキュメント」の [DynamoDB のアクション、リソース、および条件キー](https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazondynamodb.html)を参照してください。

次の状況では、追加の設定が必要になる場合があります。　
+ DynamoDB エクスポートコネクターを使用するときは、ジョブが DynamoDB テーブルのエクスポートをリクエストできるように IAM を設定する必要があります。さらに、エクスポート用の Amazon S3 バケットを特定し、DynamoDB がそのバケットに書き込むための適切な権限と、AWS Glue ジョブがそこから読み取るための適切な権限を IAM に付与する必要があります。詳細については、「[DynamoDB でテーブルのエクスポートをリクエストする](https://docs.aws.amazon.com//amazondynamodb/latest/developerguide/S3DataExport_Requesting.html)」を参照してください。　　
+ AWS Glue ジョブに特定の Amazon VPC 接続要件がある場合は、`NETWORK` AWS Glue 接続タイプを使用してネットワークオプションを指定します。DynamoDB へのアクセスは IAM によって承認されるため、AWS Glue DynamoDB 接続タイプは必要ありません。　

## DynamoDB からの読み込みと書き込み
<a name="aws-glue-programming-etl-connect-dynamodb-read-write"></a>

次のコード例では、DynamoDB テーブルからの読み取り (ETL コネクタ経由) と、DynamoDB のテーブルへの書き込み方法を示します。ここでは、あるテーブルから読み取りを行い、別のテーブルに対して書き込みを行っています。

------
#### [ Python ]

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={"dynamodb.input.tableName": test_source,
        "dynamodb.throughput.read.percent": "1.0",
        "dynamodb.splits": "100"
    }
)
print(dyf.getNumPartitions())

glue_context.write_dynamic_frame_from_options(
    frame=dyf,
    connection_type="dynamodb",
    connection_options={"dynamodb.output.tableName": test_sink,
        "dynamodb.throughput.write.percent": "1.0"
    }
)

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamoDbDataSink
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._


object GlueApp {

  def main(sysArgs: Array[String]): Unit = {
    val glueContext = new GlueContext(SparkContext.getOrCreate())
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val dynamicFrame = glueContext.getSourceWithFormat(
      connectionType = "dynamodb",
      options = JsonOptions(Map(
        "dynamodb.input.tableName" -> test_source,
        "dynamodb.throughput.read.percent" -> "1.0",
        "dynamodb.splits" -> "100"
      ))
    ).getDynamicFrame()
    
    print(dynamicFrame.getNumPartitions())

    val dynamoDbSink: DynamoDbDataSink =  glueContext.getSinkWithFormat(
      connectionType = "dynamodb",
      options = JsonOptions(Map(
        "dynamodb.output.tableName" -> test_sink,
        "dynamodb.throughput.write.percent" -> "1.0"
      ))
    ).asInstanceOf[DynamoDbDataSink]
    
    dynamoDbSink.writeDynamicFrame(dynamicFrame)

    Job.commit()
  }

}
```

------

## DynamoDB エクスポートコネクタを使用する
<a name="aws-glue-programming-etl-connect-dynamodb-export-connector"></a>

DynamoDB テーブルサイズが 80 GB を超える場合、エクスポートコネクタは ETL コネクタよりもパフォーマンスが向上します。さらに、エクスポートリクエストが AWS Glue ジョブで Spark プロセスの外部で実行される場合、[AWS Glue ジョブの自動スケーリング](https://docs.aws.amazon.com/glue/latest/dg/auto-scaling.html)を有効にして、エクスポートリクエスト中の DPU 使用量を節約できます。エクスポートコネクタでは、Spark エグゼキューターの並列処理のためのスプリット数や、DynamoDB スループットの読み取り率を設定する必要がありません。

**注記**  
DynamoDBには、`ExportTableToPointInTime` リクエストを呼び出すための特定の要件があります。詳細については、「[DynamoDB でテーブルのエクスポートをリクエストする](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.Requesting.html)」を参照してください。例えば、このコネクタを使用するには、テーブルでポイントインタイムリストア (PITR) を有効にする必要があります。DynamoDB コネクタは、Amazon S3 への DynamoDB エクスポートの AWS KMS 暗号化もサポートします。AWS Glue ジョブの設定でセキュリティ設定を指定することで、DynamoDB エクスポートの AWS KMS 暗号化が有効になります。KMS キーは、Amazon S3 バケットと同じリージョンにある必要があります。  
DynamoDB エクスポートと Amazon S3 ストレージのコストに追加料金が適用されることに注意してください。Amazon S3 でエクスポートされたデータは、ジョブ実行の終了後も保持されるため、DynamoDB を追加でエクスポートしなくても再利用できます。このコネクタを使用するための要件として、テーブルに対してポイントインタイムリカバリ (PITR) が有効になっていることが必要です。  
DynamoDB ETL コネクタまたはエクスポートコネクタは、DynamoDB ソースに適用されるフィルタまたはプッシュダウン述語をサポートしていません。

次のコード例は、（エクスポートコネクタを経由して）読み取り、パーティションの数を出力する方法を示しています。

------
#### [ Python ]

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": test_source,
        "dynamodb.s3.bucket": bucket_name,
        "dynamodb.s3.prefix": bucket_prefix,
        "dynamodb.s3.bucketOwner": account_id_of_bucket,
    }
)
print(dyf.getNumPartitions())

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamoDbDataSink
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._


object GlueApp {

  def main(sysArgs: Array[String]): Unit = {
    val glueContext = new GlueContext(SparkContext.getOrCreate())
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val dynamicFrame = glueContext.getSourceWithFormat(
      connectionType = "dynamodb",
      options = JsonOptions(Map(
        "dynamodb.export" -> "ddb",
        "dynamodb.tableArn" -> test_source,
        "dynamodb.s3.bucket" -> bucket_name,
        "dynamodb.s3.prefix" -> bucket_prefix,
        "dynamodb.s3.bucketOwner" -> account_id_of_bucket,
      ))
    ).getDynamicFrame()
    
    print(dynamicFrame.getNumPartitions())

    Job.commit()
  }

}
```

------

これらの例は、（エクスポートコネクタを介して）読み取りを行い、`dynamodb` 分類を持つ AWS Glue データカタログテーブルからパーティションの数を出力する方法を示しています。　　　　　　

------
#### [ Python ]

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dynamicFrame = glue_context.create_dynamic_frame.from_catalog(
        database=catalog_database,
        table_name=catalog_table_name,
        additional_options={
            "dynamodb.export": "ddb", 
            "dynamodb.s3.bucket": s3_bucket,
            "dynamodb.s3.prefix": s3_bucket_prefix
        }
    )
print(dynamicFrame.getNumPartitions())

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamoDbDataSink
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._


object GlueApp {

  def main(sysArgs: Array[String]): Unit = {
    val glueContext = new GlueContext(SparkContext.getOrCreate())
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val dynamicFrame = glueContext.getCatalogSource(
        database = catalog_database,
        tableName = catalog_table_name,
        additionalOptions = JsonOptions(Map(
            "dynamodb.export" -> "ddb", 
            "dynamodb.s3.bucket" -> s3_bucket,
            "dynamodb.s3.prefix" -> s3_bucket_prefix
        ))
    ).getDynamicFrame()
    print(dynamicFrame.getNumPartitions())
)
```

------

## DynamoDB エクスポート JSON の使用を簡素化
<a name="etl-connect-dynamodb-traversing-structure"></a>

AWS Glue DynamoDB エクスポートコネクタを使用した DynamoDB エクスポートでは、特殊なネスト構造の JSON ファイルが生成されます。詳細については、[データオブジェクト](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.Output.html)を参照してください。 AWS Glue は DynamicFrame 変換を提供します。これを使用して、このような構造をダウンストリームアプリケーションで使いやすい形式に変換できます。

変換は 2 つの方法のいずれかで起動できます。。DynamoDB から読み取るメソッドを呼び出すときに、値 `"true"` を使用して接続オプション `"dynamodb.simplifyDDBJson"` を設定できます。また、AWS Glue ライブラリで独立して利用可能なメソッドとして変換を呼び出すこともできます。

DynamoDB エクスポートによって生成された次のスキーマを考えてみましょう。

```
root
|-- Item: struct
|    |-- parentMap: struct
|    |    |-- M: struct
|    |    |    |-- childMap: struct
|    |    |    |    |-- M: struct
|    |    |    |    |    |-- appName: struct
|    |    |    |    |    |    |-- S: string
|    |    |    |    |    |-- packageName: struct
|    |    |    |    |    |    |-- S: string
|    |    |    |    |    |-- updatedAt: struct
|    |    |    |    |    |    |-- N: string
|    |-- strings: struct
|    |    |-- SS: array
|    |    |    |-- element: string
|    |-- numbers: struct
|    |    |-- NS: array
|    |    |    |-- element: string
|    |-- binaries: struct
|    |    |-- BS: array
|    |    |    |-- element: string
|    |-- isDDBJson: struct
|    |    |-- BOOL: boolean
|    |-- nullValue: struct
|    |    |-- NULL: boolean
```

この `simplifyDDBJson` 変換により、以下のように簡略化されます。

```
root
|-- parentMap: struct
|    |-- childMap: struct
|    |    |-- appName: string
|    |    |-- packageName: string
|    |    |-- updatedAt: string
|-- strings: array
|    |-- element: string
|-- numbers: array
|    |-- element: string
|-- binaries: array
|    |-- element: string
|-- isDDBJson: boolean
|-- nullValue: null
```

**注記**  
`simplifyDDBJson` は、AWS Glue バージョン 3.0 以降で有効です。`unnestDDBJson` トランスフォームを使用して DynamoDB のエクスポート JSON を簡略化することもできます。ユーザーには `unnestDDBJson` から `simplifyDDBJson` への移行をお勧めします。

## DynamoDB オペレーションにおける並列処理の設定
<a name="aws-glue-programming-etl-connect-dynamodb-parallelism"></a>

パフォーマンスを向上させるために、DynamoDB コネクタで使用できる特定のパラメーターを調整できます。並列処理パラメーターを調整する際の目標は、プロビジョニングされた AWS Glue ワーカーを最大限に活用することです。そして、さらにパフォーマンスを向上させる必要がある場合は、DPU 数を増加してジョブをスケールアウトすることをお勧めします。

 ETL コネクタを使用する場合、`dynamodb.splits` パラメータを使用して DynamoDB 読み取りオペレーションの並列処理を変更できます。　 エクスポートコネクタで読み込みをする場合は、Spark エグゼキュータの並列処理の分割数を構成する必要がありません。DynamoDB 書き込みオペレーションの並列処理は、`dynamodb.output.numParallelTasks` を使用して変更できます。

**DynamoDB ETL コネクターによる読み取り**

`dynamodb.splits`ジョブ構成に設定されている最大ワーカー数と以下 `numSlots` の計算に基づいて計算することをお勧めします。自動スケーリングの場合、実際に利用可能なワーカー数は、その上限に基づいて変更される可能性があります。最大ワーカー数の設定についての詳細は、「[AWS Glue で Spark ジョブに関するジョブプロパティの構成](add-job.md)」の「**[ワーカー数**] (`NumberOfWorkers`)」を参照してください。
+ `numExecutors = NumberOfWorkers - 1`

   1 つのエグゼキューターが Spark ドライバー専用になっている場合、他のエグゼキューターはデータの処理に使用されます。
+ `numSlotsPerExecutor =`

------
#### [ AWS Glue 3.0 and later versions ]
  + `WorkerType` が `G.1X` の場合は `4`
  + `WorkerType` が `G.2X` の場合は `8`
  + `WorkerType` が `G.4X` の場合は `16`
  + `WorkerType` が `G.8X` の場合は `32`

------
#### [ AWS Glue 2.0 and legacy versions ]
  + `WorkerType` が `G.1X` の場合は `8`
  + `WorkerType` が `G.2X` の場合は `16`

------
+ `numSlots = numSlotsPerExecutor * numExecutors`

`dynamodb.splits` を使用可能なスロット数、`numSlots` に設定することをおすすめします。

**DynamoDB への書き込み**　

この `dynamodb.output.numParallelTasks` パラメータは、以下の計算を使用して Spark タスクごとの WCU を決定するために使用されます。

`permittedWcuPerTask = ( TableWCU * dynamodb.throughput.write.percent ) / dynamodb.output.numParallelTasks`

DynamoDB ライターは、設定が DynamoDB に書き込まれる Spark タスクの数を正確に表している場合に最適に機能します。場合によっては、書き込みパフォーマンスを向上させるために、デフォルトの計算をオーバーライドする必要がある場合があります。このパラメータを指定しない場合、Spark タスクごとに許可される WCU は、次の式により自動的に計算されます。
+ 
  + `numPartitions = dynamicframe.getNumPartitions()`
  + `numSlots` (このセクションすでに定義したとおり)
  + `numParallelTasks = min(numPartitions, numSlots)`
+ 例 1。DPU=10、WorkerType=Standard。入力 DynamicFrame には、100 個の RDD パーティションがあります。
  + `numPartitions = 100`
  + `numExecutors = (10 - 1) * 2 - 1 = 17`
  + `numSlots = 4 * 17 = 68`
  + `numParallelTasks = min(100, 68) = 68`
+ 例 2。DPU=10、WorkerType=Standard。入力 DynamicFrame には、20 個の RDD パーティションがあります。
  + `numPartitions = 20`
  + `numExecutors = (10 - 1) * 2 - 1 = 17`
  + `numSlots = 4 * 17 = 68`
  + `numParallelTasks = min(20, 68) = 20`

**注記**  
従来の AWS Glue バージョンのジョブと Standard Worker を使用するジョブでは、スロット数を計算するために違った方法が必要となります。これらのジョブのパフォーマンスを調整する必要がある場合は、サポートされている AWS Glue バージョンに移行することをおすすめします。

## DynamoDB 接続オプションのリファレンス
<a name="aws-glue-programming-etl-connect-dynamodb"></a>

Amazon DynamoDB への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

### "connectionType": ソースとして ETL コネクタを使用する "dynamodb"
<a name="etl-connect-dynamodb-as-source"></a>

AWS Glue DynamoDB ETL コネクタを使用している場合は、`"connectionType": "dynamodb"` をソースとして次の接続オプションを使用します。
+ `"dynamodb.input.tableName"`: (必須) 読み取り元の DynamoDB テーブル。
+ `"dynamodb.throughput.read.percent"`: (オプション) 使用する読み込みキャパシティーユニット (RCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。
  + `0.5` ではデフォルトの読み込み速度を表し、AWS Glue はテーブルの読み込み容量の半分を消費しようとすることを意味します。上記の値を `0.5` より上に設定すると、AWS Glue は読み取りのリクエストレートを増加させ、`0.5` より低くした場合はそのリクエストレートを減少させます。(実際の読み込みレートは、DynamoDB テーブルに、統一ディストリビューションのキーがあるかどうかなどの要因によって変わります。)
  + DynamoDB テーブルがオンデマンドモードの場合、AWS Glue はテーブルの読み取り容量を 40000 として処理します。大きなテーブルをエクスポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。
+ `"dynamodb.splits"`: (オプション) 読み取り中にこの DynamoDB テーブルを分割するパーティションの数を定義します。デフォルトでは、"1" に設定されます。許容値は "1" から "1,000,000" (これらの値を含む) です。

  `1` は並列処理がないことを表します。より良いパフォーマンスを得るためには、より大きな値を (以下の式を使用して) 指定することを強くお勧めします。値を適切に設定する方法の詳細は、「[DynamoDB オペレーションにおける並列処理の設定](#aws-glue-programming-etl-connect-dynamodb-parallelism)」を参照してください。
+ `"dynamodb.sts.roleArn"`: (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。このパラメータは、AWS Glue 1.0 以降で使用可能です。
+ `"dynamodb.sts.roleSessionName"`: (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。このパラメータは、AWS Glue 1.0 以降で使用可能です。

### "connectionType"：AWS Glue DynamoDB エクスポートコネクタをソースとする "dynamodb"
<a name="etl-connect-dynamodb-as-source-export-connector"></a>

AWS Glue バージョン 2.0 以降でのみ使用可能な AWS Glue DynamoDB エクスポートコネクタを使用する場合は、ソースとして "connectionType": "dynamodb" で次の接続オプションを使用します。
+ `"dynamodb.export"`: (必須) 文字列値:
  + `ddb` に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になり、AWS Glue ジョブ中に新しい `ExportTableToPointInTimeRequest` が呼び出されます。`dynamodb.s3.bucket` と `dynamodb.s3.prefix` から渡された場所で新しいエクスポートが生成されます。
  + `s3` に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になりますが、新しい DynamoDB エクスポートの作成はスキップされ、代わりに `dynamodb.s3.bucket` と `dynamodb.s3.prefix` がそのテーブルの過去のエクスポートの Amazon S3 ロケーションとして使用されます。
+ `"dynamodb.tableArn"`: (必須) 読み取り元の DynamoDB テーブル。
+ `"dynamodb.unnestDDBJson"`: (オプション) デフォルト: false。　 有効な値: ブール値　 true に設定すると、エクスポートに存在する DynamoDB JSON 構造体のネスト解除の変換が実行されます。`"dynamodb.unnestDDBJson"` と `"dynamodb.simplifyDDBJson"` を同時に true に設定するとエラーになります。AWS Glue 3.0 以降のバージョンでは、DynamoDB マップタイプを簡略化するときの動作を改善するために `"dynamodb.simplifyDDBJson"` を使用することをおすすめします。詳細については、「[DynamoDB エクスポート JSON の使用を簡素化](#etl-connect-dynamodb-traversing-structure)」を参照してください。
+ `"dynamodb.simplifyDDBJson"`: (オプション) デフォルト: false。　 有効な値: ブール値　 true に設定すると、エクスポートに存在する DynamoDB JSON 構造のスキーマを簡素化するための変換を実行します。これは `"dynamodb.unnestDDBJson"` オプションと同じ目的ですが、DynamoDB テーブル内の DynamoDB マップタイプやネストされたマップタイプをより適切にサポートします。このオプションは、AWS Glue バージョン 3.0 以降でのみ有効です。`"dynamodb.unnestDDBJson"` と `"dynamodb.simplifyDDBJson"` を同時に true に設定するとエラーになります。詳細については、「[DynamoDB エクスポート JSON の使用を簡素化](#etl-connect-dynamodb-traversing-structure)」を参照してください。
+ `"dynamodb.s3.bucket"`: (オプション) DynamoDB `ExportTableToPointInTime` プロセスが実行される Simple Storage Service (Amazon S3) バケットの場所を示します。エクスポートファイル形式は DynamoDB JSON です。
  + `"dynamodb.s3.prefix"`: (オプション) DynamoDB `ExportTableToPointInTime` の読み込みが保存される Simple Storage Service (Amazon S3) バケット内の Amazon S3 プレフィックスの場所を示します。`dynamodb.s3.prefix` と `dynamodb.s3.bucket` のどちらも指定しなかった場合、これらの値は AWS Glue ジョブの設定で指定された一時ディレクトリの場所がデフォルトとなります。詳細については、「[AWS Glue で使用される特別なパラメータ](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html)」を参照してください。
  + `"dynamodb.s3.bucketOwner"`: クロスアカウント Simple Storage Service (Amazon S3) アクセスのために必要なバケット所有者を示します。
+ `"dynamodb.sts.roleArn"`: (オプション) DynamoDB テーブルのクロスアカウントアクセスおよび/またはクロスリージョンアクセスのために割り当てる IAM ロール の ARN。注: 同じ IAM ロール のARN を使用して、`ExportTableToPointInTime` リクエストに指定された Simple Storage Service (Amazon S3) の場所にアクセスします。
+ `"dynamodb.sts.roleSessionName"`: (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。
+ `"dynamodb.exportTime"` (オプション) 有効な値: ISO-8601 インスタントを表す文字列。　 エクスポートが実行されるべき時点。
+ `"dynamodb.sts.region"`: (リージョンのエンドポイントを使用してリージョン間通話を行う場合は必須) 読み取りたい DynamoDB テーブルをホストするリージョン。

### "connectionType": ETLコネクタをシンクとして使用する "dynamodb"
<a name="etl-connect-dynamodb-as-sink"></a>

`"connectionType": "dynamodb"` をシンクとして、次の接続オプションを使用します。
+ `"dynamodb.output.tableName"`: (必須) 書き込み先の DynamoDB テーブル。
+ `"dynamodb.throughput.write.percent"`: (オプション) 使用する書き込みキャパシティーユニット (WCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。
  + `0.5` ではデフォルトの読み込み速度を表し、AWS Glue はテーブルの書き込み容量の半分を消費しようとすることを意味します。上記の値を 0.5 より上に設定すると、AWS Glue は書き込みリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の書き込みレートは、統一されたキーのディストリビューションが、DynamoDB テーブルにあるかどうかなどの要因によって変わります)。
  + DynamoDB テーブルがオンデマンドモードの場合、AWS Glue はテーブルの書き込み容量を `40000` として処理します。大きなテーブルをインポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。
+ `"dynamodb.output.numParallelTasks"`: (オプション) 同時に DynamoDB に書き込める並列タスクの数を定義します。Spark タスクごとに許容される WCU を計算するために使用されます。ほとんどの場合、AWS Glue はこの値の妥当なデフォルト値を計算します。　 詳細については、「[DynamoDB オペレーションにおける並列処理の設定](#aws-glue-programming-etl-connect-dynamodb-parallelism)」を参照してください。
+ `"dynamodb.output.retry"`: (オプション) DynamoDB から `ProvisionedThroughputExceededException` が送られている場合の、再試行の実行回数を定義します。デフォルトでは、"10" に設定されています。
+ `"dynamodb.sts.roleArn"`: (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。
+ `"dynamodb.sts.roleSessionName"`: (任意) STS セッション名。デフォルトでは、「glue-dynamodb-write-sts-session」に設定されています。

# DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス
<a name="aws-glue-programming-etl-dynamo-db-cross-account"></a>

AWS Glue ETL ジョブでは、DynamoDB テーブルに対する、クロスリージョンならびにクロスアカウントでのアクセスを供にサポートしています。AWS GlueETL ジョブは、別の AWS アカウントの DynamoDB テーブルからのデータ読み取り、および別の AWS アカウントの DynamoDB テーブルへのデータ書き込みをサポートしています。加えて AWS Glue では、別のリージョンの DynamoDB テーブルからの読み取りと、別のリージョンの DynamoDB テーブルへの書き込みもサポートしています。このセクションでは、このアクセスを設定する手順と、スクリプトの例を示します。

このセクションに示す手順では、IAM のチュートリアルに則り、IAM ロールを作成しそのロールにアクセス許可を付与しています。このチュートリアルでは、ロールの引き受けについても解説されていますが、今回はその代わりに、AWS Glue でのロールの引き受けにはジョブ・スクリプトを使用します。また、このチュートリアルでは、クロスアカウントに関連した一般的な作業についても説明しています。詳しくは、*IAM ユーザーガイド*の「[チュートリアル: IAMロールを使用した AWS アカウント間のアクセス権の委譲](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html)」を参照してください。

## ロールを作成する
<a name="aws-glue-programming-etl-dynamo-db-create-role"></a>

[チュートリアルのステップ 1](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html#tutorial_cross-account-with-roles-1) に従い、アカウント A に IAM ロールを作成します。ロールのアクセス許可を定義する際に、`AmazonDynamoDBReadOnlyAccess` または `AmazonDynamoDBFullAccess` など既存のポリシーをアタッチして、ロールに対し DynamoDB の読み取り/書き込みを許可します。次の例に、アクセス許可ポリシー `AmazonDynamoDBFullAccess` を使用する、`DynamoDBCrossAccessRole` という名前のロールを作成する方法を示します。

## ロールにアクセス許可を付与する
<a name="aws-glue-programming-etl-dynamo-db-grant-access"></a>

*IAM ユーザーガイド*にある[チュートリアルのステップ 2](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html#tutorial_cross-account-with-roles-2) に従い、アカウント B が新しく作成されたロールに切り替えることを許可します。次の例では、ここで示されるステートメントを使用して新しいポリシーを作成しています。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": {
    "Effect": "Allow",
    "Action": "sts:AssumeRole",
    "Resource": "arn:aws:iam::111122223333:role/DynamoDBCrossAccessRole"
  }
}
```

------

次に、DynamoDB へのアクセスに使用するグループ/ロール/ユーザーに、このポリシーをアタッチします。

## AWS Glue Job スクリプトの中でロールを引き受ける
<a name="aws-glue-programming-etl-dynamo-db-assume-role"></a>

この段階で、アカウント B にログインして、AWS Glue ジョブを作成できるようになっています。ジョブを作成するには、[AWS Glue で Spark ジョブに関するジョブプロパティの構成](add-job.md) にある手順を参照してください。

ジョブスクリプト内で `DynamoDBCrossAccessRole` ロールを引き受けるには、`dynamodb.sts.roleArn` パラメーターを使用する必要があります。このロールを引き受けることで、アカウント B の DynamoDB にアクセスするために必要な、一時的な認証情報を取得できます。このためのサンプルスクリプトを示します。

リージョン間でのクロスアカウント読み取りの場合 (ETL コネクタ):

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dyf = glue_context.create_dynamic_frame_from_options(
    connection_type="dynamodb",
    connection_options={
    "dynamodb.region": "us-east-1",
    "dynamodb.input.tableName": "test_source",
    "dynamodb.sts.roleArn": "<DynamoDBCrossAccessRole's ARN>"
    }
)
dyf.show()
job.commit()
```

リージョン間でのクロスアカウント読み取りの場合 (ETL コネクタ):

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dyf = glue_context.create_dynamic_frame_from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "<test_source ARN>",
        "dynamodb.sts.roleArn": "<DynamoDBCrossAccessRole's ARN>"
    }
)
dyf.show()
job.commit()
```

リージョンにまたがるクロスアカウント読み取りおよび書き込みの場合:

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
 
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)
 
dyf = glue_context.create_dynamic_frame_from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.region": "us-east-1",
        "dynamodb.input.tableName": "test_source"
    }
)
dyf.show()
 
glue_context.write_dynamic_frame_from_options(
    frame=dyf,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.region": "us-west-2",
        "dynamodb.output.tableName": "test_sink",
        "dynamodb.sts.roleArn": "<DynamoDBCrossAccessRole's ARN>"
    }
)
 
job.commit()
```

# Spark DataFrame をサポートする DynamoDB コネクタ
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-support"></a>

Spark DataFrame をサポートする DynamoDB コネクタを使用すると、Spark DataFrame API を使用して DynamoDB 内のテーブルとの間で読み書きを行うことができます。コネクタのセットアップ手順は DynamicFrame ベースのコネクタの場合と同じで、[こちら](aws-glue-programming-etl-connect-dynamodb-home.md#aws-glue-programming-etl-connect-dynamodb-configure)から確認できます。

DataFrame ベースのコネクタライブラリにロードするには、必ず DynamoDB 接続を Glue ジョブにアタッチしてください。

**注記**  
Glue コンソール UI は現在、DynamoDB 接続の作成をサポートしていません。Glue CLI ([CreateConnection](https://docs.aws.amazon.com/cli/latest/reference/glue/create-connection.html)) を使用して DynamoDB 接続を作成できます。  

```
        aws glue create-connection \
            --connection-input '{ \
                "Name": "my-dynamodb-connection", \
                "ConnectionType": "DYNAMODB", \
                "ConnectionProperties": {}, \
                "ValidateCredentials": false, \
                "ValidateForComputeEnvironments": ["SPARK"] \
            }'
```

DynamoDB 接続を作成すると、Glue ジョブに CLI ([CreateJob](https://docs.aws.amazon.com/cli/latest/reference/glue/create-job.html)、[UpdateJob](https://docs.aws.amazon.com/cli/latest/reference/glue/update-job.html)) を介してアタッチするか、「ジョブの詳細」ページで直接アタッチすることができます。

![\[alt text not found\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/dynamodb-dataframe-connector.png)


DYNAMODB タイプの接続が Glue ジョブにアタッチされたら、DataFrame ベースのコネクタから次の読み取り、書き込み、エクスポートオペレーションを使用できます。

## DataFrame ベースのコネクタを使用した DynamoDB からの読み取りと書き込み
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-read-write"></a>

次のコード例は、DynamoDB ベースのコネクタを介した DynamoDB に対する読み込みと書き込みを行う方法を方法を示します。ここでは、あるテーブルから読み取りを行い、別のテーブルに対して書き込みを行っています。

------
#### [ Python ]

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

# Read from DynamoDB
df = spark.read.format("dynamodb") \
    .option("dynamodb.input.tableName", "test-source") \
    .option("dynamodb.throughput.read.ratio", "0.5") \
    .option("dynamodb.consistentRead", "false") \
    .load()

print(df.rdd.getNumPartitions())

# Write to DynamoDB
df.write \
  .format("dynamodb") \
  .option("dynamodb.output.tableName", "test-sink") \
  .option("dynamodb.throughput.write.ratio", "0.5") \
  .option("dynamodb.item.size.check.enabled", "true") \
  .save()

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    
    val glueContext = new GlueContext(SparkContext.getOrCreate())
    val spark = glueContext.getSparkSession
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val df = spark.read
      .format("dynamodb")
      .option("dynamodb.input.tableName", "test-source")
      .option("dynamodb.throughput.read.ratio", "0.5")
      .option("dynamodb.consistentRead", "false")
      .load()

    print(df.rdd.getNumPartitions)

    df.write
      .format("dynamodb")
      .option("dynamodb.output.tableName", "test-sink")
      .option("dynamodb.throughput.write.ratio", "0.5")
      .option("dynamodb.item.size.check.enabled", "true")
      .save()

    job.commit()
  }
}
```

------

## DataFrame ベースのコネクタを介して DynamoDB エクスポートを使用する
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-export"></a>

80 GB を超える DynamoDB テーブルサイズの読み取りオペレーションでは、エクスポートオペレーションが向上します。次のコード例は、DataFrame ベースのコネクタを介してテーブルからの読み取り、S3 へのエクスポート、パーティションの数の出力を行う例を示します。

**注記**  
DynamoDB エクスポート機能は Scala `DynamoDBExport` オブジェクトから使用できます。Python ユーザーは、Spark の JVM インターロープを介してアクセスするか、DynamoDB `ExportTableToPointInTime` API で AWS SDK for Python (boto3) を使用することができます。

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job}
import org.apache.spark.SparkContext
import glue.spark.dynamodb.DynamoDBExport
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val glueContext = new GlueContext(SparkContext.getOrCreate())
    val spark = glueContext.getSparkSession
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val options = Map(
      "dynamodb.export" -> "ddb",
      "dynamodb.tableArn" -> "arn:aws:dynamodb:us-east-1:123456789012:table/my-table",
      "dynamodb.s3.bucket" -> "my-s3-bucket",
      "dynamodb.s3.prefix" -> "my-s3-prefix",
      "dynamodb.simplifyDDBJson" -> "true"
    )
    val df = DynamoDBExport.fullExport(spark, options)
    
    print(df.rdd.getNumPartitions)
    df.count()
    
    Job.commit()
  }
}
```

------

## 設定オプション
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-options"></a>

### 読み取りオプション
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-read-options"></a>


| オプション | 説明 | デフォルト | 
| --- | --- | --- | 
| dynamodb.input.tableName | DynamoDB テーブル名 (必須) | - | 
| dynamodb.throughput.read | 使用する読み込みキャパシティーユニット (RCU)。指定しない場合、dynamodb.throughput.read.ratio が計算に使用されます。 | - | 
| dynamodb.throughput.read.ratio | 使用する読み込みキャパシティーユニット (RCU) の比率。 | 0.5 | 
| dynamodb.table.read.capacity | スループットの計算に使用されるオンデマンドテーブルの読み込みキャパシティーユニット。このパラメータは、オンデマンドキャパシティーテーブルでのみ有効です。デフォルトはウォームスループット読み込みユニットです。 | - | 
| dynamodb.splits | 並列スキャンオペレーションで使用されるセグメントの数を定義します。指定しない場合、コネクタは妥当なデフォルト値を計算します。 | - | 
| dynamodb.consistentRead | 強力な整合性のある読み込みを使用するかどうか。 | FALSE | 
| dynamodb.input.retry | 再試行可能な例外がある場合に実行する再試行回数を定義します。 | 10 | 

### 書き込みオプション
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-write-options"></a>


| オプション | 説明 | デフォルト | 
| --- | --- | --- | 
| dynamodb.output.tableName | DynamoDB テーブル名 (必須) | - | 
| dynamodb.throughput.write | 使用する書き込みキャパシティーユニット (WCU)。指定しない場合、dynamodb.throughput.write.ratio が計算に使用されます。 | - | 
| dynamodb.throughput.write.ratio | 使用する書き込みキャパシティーユニット (WCU) の比率。 | 0.5 | 
| dynamodb.table.write.capacity | スループットの計算に使用されるオンデマンドテーブルの書き込みキャパシティーユニット。このパラメータは、オンデマンドキャパシティーテーブルでのみ有効です。デフォルトはウォームスループット書き込みユニットです。 | - | 
| dynamodb.item.size.check.enabled | true の場合、コネクタは項目サイズを計算して、サイズが最大サイズを超えた場合はDynamoDB テーブルに書き込む前に中止します。 | TRUE | 
| dynamodb.output.retry | 再試行可能な例外がある場合に実行する再試行回数を定義します。 | 10 | 

### エクスポートオプション
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-export-options"></a>


| オプション | 説明 | デフォルト | 
| --- | --- | --- | 
| dynamodb.export | ddb に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になり、AWS Glue ジョブ中に新しい ExportTableToPointInTimeRequet が呼び出されます。dynamodb.s3.bucket と dynamodb.s3.prefix から渡された場所で新しいエクスポートが生成されます。s3 に設定すると、AWS DynamoDB エクスポートコネクタが有効になりますが、新しい DynamoDB エクスポートの作成はスキップされ、代わりに dynamodb.s3.bucket と dynamodb.s3.prefix がそのテーブルの過去のエクスポートの Amazon S3 ロケーションとして使用されます。 | ddb | 
| dynamodb.tableArn | 読み取り元の DynamoDB テーブル。dynamodb.export が ddb に設定されている場合は必須です。 |  | 
| dynamodb.simplifyDDBJson | true に設定すると、エクスポートに存在する DynamoDB JSON 構造のスキーマを簡素化するための変換を実行します。 | FALSE | 
| dynamodb.s3.bucket | DynamoDB エクスポート中に一時データを保存する S3 バケット (必須)。 |  | 
| dynamodb.s3.prefix | DynamoDB エクスポート中に一時データを保存する S3 プレフィックス。 |  | 
| dynamodb.s3.bucketOwner | クロスアカウント Simple Storage Service (Amazon S3) アクセスのために必要なバケット所有者を示します。 |  | 
| dynamodb.s3.sse.algorithm | 一時データが保存されるバケットで使用される暗号化のタイプ。有効な値は、AES256 および KMS です。 |  | 
| dynamodb.s3.sse.kmsKeyId | 一時データが保存される S3 バケットの暗号化に使用される AWS KMS マネージドキーの ID (該当する場合)。 |  | 
| dynamodb.exportTime | エクスポートが実行されるべき時点。有効な値: ISO-8601 インスタントを表す文字列。　 |  | 

### 汎用オプション
<a name="aws-glue-programming-etl-connect-dynamodb-dataframe-general-options"></a>


| オプション | 説明 | デフォルト | 
| --- | --- | --- | 
| dynamodb.sts.roleArn | クロスアカウントアクセスのために引き受ける IAM ロール の ARN。 | - | 
| dynamodb.sts.roleSessionName | STS セッション名。 | glue-dynamodb-sts-session | 
| dynamodb.sts.region | STS クライアントのリージョン (クロスリージョンロールの引き受け用)。 | region オプションと同じ。 | 