

# Amazon DocumentDB の接続　
<a name="aws-glue-programming-etl-connect-documentdb-home"></a>

Spark 用の AWS Glue を使用して Amazon DocumentDB 内のテーブルに対する読み込みと書き込みを行うことができます。AWS Glue 接続を介して AWS Secrets Manager で保存されている認証情報を使用して Amazon DocumentDB に接続できます。

Amazon DocumentDB の詳細については、「[Amazon DocumentDB のドキュメント](https://docs.aws.amazon.com/documentdb/latest/developerguide/what-is.html)」を参照してください。

**注記**  
Amazon DocumentDB エラスティッククラスターは、現在 AWS Glue コネクタを使用する場合にはサポートされていません。エラスティッククラスターの詳細については、「[Amazon DocumentDB エラスティッククラスターの使用](https://docs.aws.amazon.com/documentdb/latest/developerguide/docdb-using-elastic-clusters.html)」を参照してください。

## Amazon DocumentDB コレクションへの読み取りと書き込み
<a name="aws-glue-programming-etl-connect-documentdb-read-write"></a>

**注記**  
Amazon DocumentDB に接続する ETL ジョブを作成する際、`Connections` ジョブのプロパティにおいて、Amazon DocumentDB が実行されている Virtual Private Cloud (VPC) を特定するための、接続オブジェクトを指定する必要があります。接続オブジェクトの場合、接続タイプは `JDBC` で、`JDBC URL` は `mongo://<DocumentDB_host>:27017` である必要があります。

**注記**  
これらのコードサンプルは AWS Glue 3.0 用に開発されました。AWS Glue 4.0 への移行については、[MongoDB](migrating-version-40.md#migrating-version-40-connector-driver-migration-mongodb) を参照してください。`uri` パラメータが変更されました。　

**注記**  
Amazon DocumentDB を使用する場合、書かれた文書に `_id` が指定されている場合など、特定の状況では、`retryWrites` は false に設定する必要があります。詳細については、Amazon DocumentDB ドキュメントの「[MongoDB との機能の違い](https://docs.aws.amazon.com/documentdb/latest/developerguide/functional-differences.html#functional-differences.retryable-writes)」を参照してください。　

次の Python スクリプトでは、Amazon DocumentDB に対する読み取りと書き込みのための、接続タイプと接続オプションの使用方法を示しています。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017"
documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": "coll",
    "username": "username",
    "password": "1234567890",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

write_documentdb_options = {
    "retryWrites": "false",
    "uri": documentdb_write_uri,
    "database": "test",
    "collection": "coll",
    "username": "username",
    "password": "pwd"
}

# Get DynamicFrame from  DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                               connection_options=read_docdb_options)

# Write DynamicFrame to MongoDB and DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

job.commit()
```

次の Scala スクリプトでは、Amazon DocumentDB に対する読み取りと書き込みのための、接続タイプと接続オプションの使用方法を示しています。

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val documentDBJsonOption = jsonOptions(DOC_URI)
  lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from DocumentDB
    val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame()

    // Write DynamicFrame to DocumentDB
    glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"test",
         |"collection":"coll",
         |"username": "username",
         |"password": "pwd",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

## Amazon DocumentDB 接続のオプションのリファレンス
<a name="aws-glue-programming-etl-connect-documentdb"></a>

Amazon DocumentDB (MongoDB 互換) への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

### "connectionType": "Documentdb" ソースとする
<a name="etl-connect-documentdb-as-source"></a>

`"connectionType": "documentdb"` をソースとして、次の接続オプションを使用します。
+ `"uri"`: (必須) 読み取り元の Amazon DocumentDB ホスト (`mongodb://<host>:<port>` 形式)。
+ `"database"`: (必須) 読み取り元の Amazon DocumentDB データベース。
+ `"collection"`: (必須) 読み取り元の Amazon DocumentDB コレクション。
+ `"username"`: (必須) Amazon DocumentDB のユーザー名。
+ `"password"`: (必須) Amazon DocumentDB のパスワード。
+ `"ssl"`: (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を `"true"` に設定する必要があります。
+ `"ssl.domain_match"`: (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を `"false"` に設定する必要があります。
+ `"batchSize"`: (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。
+ `"partitioner"`: (オプション) Amazon DocumentDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。
  + `MongoDefaultPartitioner` (デフォルト) (AWS Glue 4.0 ではサポートされていません)
  + `MongoSamplePartitioner` (AWS Glue 4.0 ではサポートされていません)
  + `MongoShardedPartitioner`
  + `MongoSplitVectorPartitioner`
  + `MongoPaginateByCountPartitioner`
  + `MongoPaginateBySizePartitioner` (AWS Glue 4.0 ではサポートされていません)
+ `"partitionerOptions"` (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`: `partitionKey`、partitionSizeMB
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`: `partitionKey`、partitionSizeMB

  これらのオプションの詳細については、MongoDB のドキュメントの「[Partitioner Configuration](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)」を参照してください。

### "connectionType": "documentdb" as Sink
<a name="etl-connect-documentdb-as-sink"></a>

`"connectionType": "documentdb"` をシンクとして、次の接続オプションを使用します。
+ `"uri"`: (必須) 書き込み先の Amazon DocumentDB ホスト (`mongodb://<host>:<port>` 形式)。
+ `"database"`: (必須) 書き込み先の Amazon DocumentDB データベース。
+ `"collection"`: (必須) 書き込み先の Amazon DocumentDB コレクション。
+ `"username"`: (必須) Amazon DocumentDB のユーザー名。
+ `"password"`: (必須) Amazon DocumentDB のパスワード。
+ `"extendedBsonTypes"`: (オプション) `true` が指定されている場合、Amazon DocumentDB へのデータ書き込み時に拡張 BSON 型を使用できます。デフォルトは `true` です。
+ `"replaceDocument"`: (オプション) `true` の場合、`_id` フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。`false` の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルトは `true` です。
+ `"maxBatchSize"`: (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。
+ `"retryWrites"`: (オプション): AWS Glue でネットワークエラーが発生した場合、特定の書き込みオペレーションを 1 回自動的に再試行します。