

# Amazon DocumentDB 连接
<a name="aws-glue-programming-etl-connect-documentdb-home"></a>

您可以使用 AWS Glue for Spark 读取和写入 Amazon DocumentDB 中的表。您可以通过 AWS Glue 连接使用存储在 AWS Secrets Manager 中的凭证连接到 Amazon DocumentDB。

有关 Amazon DocumentDB 的更多信息，请参阅 [Amazon DocumentDB 文档](https://docs.aws.amazon.com/documentdb/latest/developerguide/what-is.html)。

**注意**  
使用 AWS Glue 连接器时，目前不支持 Amazon DocumentDB 弹性集群。有关弹性集群的更多信息，请参阅 [Using Amazon DocumentDB elastic clusters](https://docs.aws.amazon.com/documentdb/latest/developerguide/docdb-using-elastic-clusters.html)。

## 读取和写入 Amazon DocumentDB 集合
<a name="aws-glue-programming-etl-connect-documentdb-read-write"></a>

**注意**  
当您创建连接到 Amazon DocumentDB 的 ETL 任务时，对于 `Connections` 任务属性，您必须指定一个连接对象，用于指定在其中运行 Amazon DocumentDB 的 Virtual Private Cloud（VPC）。对于该连接对象，连接类型必须为 `JDBC`，且 `JDBC URL` 必须为 `mongo://<DocumentDB_host>:27017`。

**注意**  
这些代码示例是为 AWS Glue 3.0 开发的。要迁移到 AWS Glue 4.0，请参阅 [MongoDB](migrating-version-40.md#migrating-version-40-connector-driver-migration-mongodb)。`uri` 参数已更改。

**注意**  
使用 Amazon DocumentDB 时，在某些情况下必须将 `retryWrites` 设置为 false，例如编写的文档指定 `_id` 时。有关更多信息，请参阅 Amazon DocumentDB 文档中的 [与 MongoDB 之间的功能差异](https://docs.aws.amazon.com/documentdb/latest/developerguide/functional-differences.html#functional-differences.retryable-writes)。

以下 Python 脚本展示了如何使用连接类型和连接选项来读写 Amazon DocumentDB。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017"
documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": "coll",
    "username": "username",
    "password": "1234567890",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

write_documentdb_options = {
    "retryWrites": "false",
    "uri": documentdb_write_uri,
    "database": "test",
    "collection": "coll",
    "username": "username",
    "password": "pwd"
}

# Get DynamicFrame from  DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                               connection_options=read_docdb_options)

# Write DynamicFrame to MongoDB and DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

job.commit()
```

以下 Scala 脚本展示了如何使用连接类型和连接选项来读写 Amazon DocumentDB。

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val documentDBJsonOption = jsonOptions(DOC_URI)
  lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from DocumentDB
    val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame()

    // Write DynamicFrame to DocumentDB
    glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"test",
         |"collection":"coll",
         |"username": "username",
         |"password": "pwd",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

## Amazon DocumentDB 连接选项参考
<a name="aws-glue-programming-etl-connect-documentdb"></a>

指定与 Amazon DocumentDB (with MongoDB compatibility) 的连接。

源连接和接收器连接的连接选项不同。

### "connectionType": "documentdb" as Source
<a name="etl-connect-documentdb-as-source"></a>

将 `"connectionType": "documentdb"` 用作源时可使用以下连接选项：
+ `"uri"`：（必需）要从中读取数据的 Amazon DocumentDB 主机，格式为 `mongodb://<host>:<port>`。
+ `"database"`：（必需）要从中读取数据的 Amazon DocumentDB 数据库。
+ `"collection"`：（必需）要从中读取数据的 Amazon DocumentDB 连接。
+ `"username"`：（必需）Amazon DocumentDB 用户名。
+ `"password"`：（必需）Amazon DocumentDB 密码。
+ `"ssl"`：（如果使用 SSL，则必需）如果您的连接使用 SSL，则必须包含此选项且值为 `"true"`。
+ `"ssl.domain_match"`：（如果使用 SSL，则必需）如果您的连接使用 SSL，则必须包含此选项且值为 `"false"`。
+ `"batchSize"`：（可选）每个批处理返回的文档数量，在内部批处理的游标中使用。
+ `"partitioner"`：（可选）从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器：
  + `MongoDefaultPartitioner`（默认）（AWS Glue 4.0 不支持）
  + `MongoSamplePartitioner`（AWS Glue 4.0 不支持）
  + `MongoShardedPartitioner`
  + `MongoSplitVectorPartitioner`
  + `MongoPaginateByCountPartitioner`
  + `MongoPaginateBySizePartitioner`（AWS Glue 4.0 不支持）
+ `"partitionerOptions"`：（可选）指定分区器的选项。各个分区器支持的选项如下：
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`：`partitionKey`、partitionSizeMB
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`：`partitionKey`、partitionSizeMB

  有关这些选项的更多信息，请参阅 MongoDB 文档中的[分区器配置](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)。

### "connectionType": "documentdb" as Sink
<a name="etl-connect-documentdb-as-sink"></a>

将 `"connectionType": "documentdb"` 用作连接器时可使用以下连接选项：
+ `"uri"`：（必需）要在其中写入数据的 Amazon DocumentDB 主机，格式为 `mongodb://<host>:<port>`。
+ `"database"`：（必需）要在其中写入数据的 Amazon DocumentDB 数据库。
+ `"collection"`：（必需）要在其中写入数据的 Amazon DocumentDB 连接。
+ `"username"`：（必需）Amazon DocumentDB 用户名。
+ `"password"`：（必需）Amazon DocumentDB 密码。
+ `"extendedBsonTypes"`：（可选）如果为 `true`，则在 Amazon DocumentDB 中写入数据时会启用扩展 BSON 类型。默认值为 `true`。
+ `"replaceDocument"`：（可选）如果为 `true`，则在保存包含 `_id` 字段的数据集时会替换整个文档。如果为 `false`，则只会更新文档中与数据集中的字段匹配的字段。默认值为 `true`。
+ `"maxBatchSize"`：（可选）保存数据时的批量操作的最大批次大小。默认值为 512。
+ `"retryWrites"`：（可选）：如果 AWS Glue 遇到网络错误，则会自动重试某些写入操作一次。