

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon DocumentDB 連線
<a name="aws-glue-programming-etl-connect-documentdb-home"></a>

您可以使用 AWS Glue for Spark 來讀取和寫入 Amazon DocumentDB 中的資料表。您可以使用 AWS Secrets Manager 透過 Glue 連線儲存在 中的登入資料來連線至 AWS Amazon DocumentDB。

如需 Amazon DocumentDB 的詳細資訊，請參閱 [Amazon DocumentDB 文件](https://docs.aws.amazon.com/documentdb/latest/developerguide/what-is.html)。

**注意**  
使用 Glue 連接器時，目前不支援 Amazon DocumentDB AWS 彈性叢集。如需有關彈性叢集的詳細資訊，請參閱 [Using Amazon DocumentDB elastic clusters](https://docs.aws.amazon.com/documentdb/latest/developerguide/docdb-using-elastic-clusters.html)。

## 讀取和寫入 Amazon DocumentDB 集合
<a name="aws-glue-programming-etl-connect-documentdb-read-write"></a>

**注意**  
建立連接至 Amazon DocumentDB 的 ETL 任務時，如為 `Connections` 任務屬性，您必須指定連線物件，以指定執行 Amazon DocumentDB 的 Virtual Private Cloud (VPC)。如為連線物件，連線類型必須為 `JDBC`，而 `JDBC URL` 必須為 `mongo://<DocumentDB_host>:27017`。

**注意**  
這些程式碼範例是針對 AWS Glue 3.0 所開發。若要遷移至 AWS Glue 4.0，請參閱 [MongoDB](migrating-version-40.md#migrating-version-40-connector-driver-migration-mongodb)。`uri` 參數已變更。

**注意**  
使用 Amazon DocumentDB 時，在某些情況下 `retryWrites` 必須設定為 false，例如當編寫的文件指定 `_id` 時。如需詳細資訊，請參閱 Amazon DocumentDB 文件中的 [MongoDB 的功能差異](https://docs.aws.amazon.com/documentdb/latest/developerguide/functional-differences.html#functional-differences.retryable-writes)。

下列 Python 指令碼示範使用連線類型和連線選項，以讀取和寫入至 Amazon DocumentDB。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017"
documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": "coll",
    "username": "username",
    "password": "1234567890",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

write_documentdb_options = {
    "retryWrites": "false",
    "uri": documentdb_write_uri,
    "database": "test",
    "collection": "coll",
    "username": "username",
    "password": "pwd"
}

# Get DynamicFrame from  DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                               connection_options=read_docdb_options)

# Write DynamicFrame to MongoDB and DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

job.commit()
```

下列 Scala 指令碼示範使用連線類型和連線選項，以讀取和寫入至 Amazon DocumentDB。

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val documentDBJsonOption = jsonOptions(DOC_URI)
  lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from DocumentDB
    val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame()

    // Write DynamicFrame to DocumentDB
    glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"test",
         |"collection":"coll",
         |"username": "username",
         |"password": "pwd",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

## Amazon DocumentDB 連線選項參考
<a name="aws-glue-programming-etl-connect-documentdb"></a>

指定 Amazon DocumentDB (with MongoDB compatibility) 的連線。

來源連線和接收器連線的連線選項不同。

### "connectionType": "Documentdb" as source
<a name="etl-connect-documentdb-as-source"></a>

使用下列有 `"connectionType": "documentdb"` 的連線選項作為來源：
+ `"uri"`：(必要) 讀取的 Amazon DocumentDB 主機，格式為 `mongodb://<host>:<port>`。
+ `"database"`：(必要) 要從中讀取的 Amazon DocumentDB 資料庫。
+ `"collection"`：(必要) 要從中讀取的 Amazon DocumentDB 集合。
+ `"username"`：(必要) Amazon DocumentDB 使用者名稱。
+ `"password"`：(必要) Amazon DocumentDB 密碼。
+ `"ssl"`：(如果使用 SSL，則為必要) 如果您的連線使用 SSL，則必須加入此選項並具備值 `"true"`。
+ `"ssl.domain_match"`：(如果使用 SSL，則為必要) 如果您的連線使用 SSL，則必須加入此選項並具備值 `"false"`。
+ `"batchSize"`：(選用)：每個批次傳回的文件數目，於內部批次的游標內使用。
+ `"partitioner"`：(選用)：從 Amazon DocumentDB 讀取輸入資料的分割區類別名稱。連接器提供下列分割區：
  + `MongoDefaultPartitioner` （預設） (Glue AWS 4.0 不支援）
  + `MongoSamplePartitioner` (Glue 4.0 AWS 不支援）
  + `MongoShardedPartitioner`
  + `MongoSplitVectorPartitioner`
  + `MongoPaginateByCountPartitioner`
  + `MongoPaginateBySizePartitioner` (Glue AWS 4.0 不支援）
+ `"partitionerOptions"` (選用)：指定分割區的選項。每個分割區都支援下列選項：
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`：`partitionKey`，partitionSizeMB
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`：`partitionKey`，partitionSizeMB

  如需有關這些選項的詳細資訊，請參閱 MongoDB 文件中的[分割區組態](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)。

### "connectionType": "Documentdb" as sink
<a name="etl-connect-documentdb-as-sink"></a>

使用下列有 `"connectionType": "documentdb"` 的連線選項作為接收器：
+ `"uri"`：(必要) 寫入的 Amazon DocumentDB 主機，格式為 `mongodb://<host>:<port>`。
+ `"database"`：(必要) 要寫入的 Amazon DocumentDB 資料庫。
+ `"collection"`：(必要) 要寫入的 Amazon DocumentDB 集合。
+ `"username"`：(必要) Amazon DocumentDB 使用者名稱。
+ `"password"`：(必要) Amazon DocumentDB 密碼。
+ `"extendedBsonTypes"`：(選用) 如果 `true`，在寫入資料到 Amazon DocumentDB 時允許延伸的 BSON 類型。預設值為 `true`。
+ `"replaceDocument"`：(選用) 如果 `true`，則在儲存包含 `_id` 欄位的資料集時取代整個文件。若為 `false`，則文件中僅有與資料集中欄位相符的欄位會更新。預設值為 `true`。
+ `"maxBatchSize"`：(選用)：儲存資料時大量操作的批次大小上限。預設為 512。
+ `"retryWrites"`：（選用）：如果 AWS Glue 遇到網路錯誤，則一次自動重試特定寫入操作。