

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# MongoDB 連線
<a name="aws-glue-programming-etl-connect-mongodb-home"></a>

您可以使用 AWS Glue for Spark 來讀取和寫入 Glue 4.0 和更新版本的 MongoDB AWS 和 MongoDB Atlas 中的資料表。您可以使用 AWS Secrets Manager 透過 Glue 連線儲存在 AWS 中的使用者名稱和密碼登入資料來連線至 MongoDB。

如需有關 MongoDB 的詳細資訊，請參閱 [MongoDB 文件](https://www.mongodb.com/docs/)。

## 設定 MongoDB 連線
<a name="aws-glue-programming-etl-connect-mongodb-configure"></a>

若要從 Glue AWS 連線至 MongoDB，您需要 MongoDB 登入資料 *mongodbUser* 和 *mongodbPass*。

若要從 Glue AWS 連線至 MongoDB，您可能需要一些先決條件：
+ 如果您的 MongoDB 執行個體位於 Amazon VPC 中，請將 Amazon VPC 設定為允許 Glue AWS 任務與 MongoDB 執行個體通訊，而不會讓流量周遊公有網際網路。

  在 Amazon VPC 中，識別或建立 Glue AWS 在執行任務時將使用的 **VPC**、**子網路**和**安全群組**。此外，您也需要確保 Amazon VPC 已完成設定，以允許 MongoDB 執行個體與此位置之間的網路流量。根據您的網路配置，這可能需要變更安全群組規則、網路 ACL、NAT 閘道及對等連線。

然後，您可以繼續設定 AWS Glue 以與 MongoDB 搭配使用。

**設定連至 MongoDB 的連線：**

1. 或者，在 中 AWS Secrets Manager，使用您的 MongoDB 登入資料建立秘密。若要在 Secrets Manager 中建立秘密，請遵循 AWS Secrets Manager 文件中[建立 AWS Secrets Manager 秘密](https://docs.aws.amazon.com//secretsmanager/latest/userguide/create_secret.html)中提供的教學課程。建立機密之後，請保留機密名稱 *secretName*，以便進行下一個步驟。
   + 在選取**鍵/值組**時，請使用 *mongodbUser* 值來建立 `username` 金鑰對。

     在選取**鍵/值組**時，請使用 *mongodbPass* 值來建立 `password` 金鑰對。

1. 在 AWS Glue 主控台中，依照中的步驟建立連線[新增 AWS Glue 連線](console-connections.md)。建立連線後，請保留連線名稱 *connectionName*，以供未來在 Glue AWS 中使用。
   + 選取**連線類型**時，請選取 **MongoDB** 或 **MongoDB Atlas**。
   + 選取 **MongoDB URL** 或 **MongoDB Atlas URL** 時，請提供 MongoDB 執行個體的主機名稱。

     MongoDB URL 會以 `mongodb://mongoHost:mongoPort/mongoDBname` 格式提供。

     MongoDB Atlas URL 會以 `mongodb+srv://mongoHost/mongoDBname` 格式提供。
   + 如果您選擇建立 Secrets Manager 秘密，請選擇 AWS Secrets Manager **憑證類型**。

     然後，在 **AWS 密碼**中提供 *secretName*。
   + 如果您選擇提供**使用者名稱和密碼**，請提供 *mongodbUser* 和 *mongodbPass*。

1. 在下列情況中，您可能需要其他組態：
   + 

     對於 Amazon VPC AWS 中託管於 的 MongoDB 執行個體
     + 您需要提供 Amazon VPC 連線資訊給定義 MongoDB 安全登入資料的 AWS Glue 連線。建立或更新連線時，請在**網路選項**中設定 **VPC**、**子網路**及**安全群組**。

建立 AWS Glue MongoDB 連線後，您需要先執行下列動作，才能呼叫連線方法：
+ 如果您選擇建立 Secrets Manager 秘密，請授予與 Glue 任務相關聯的 IAM AWS 角色讀取 *secretName* 的許可。
+ 在您的 AWS Glue 任務組態中，提供 *connectionName* 作為**其他網路連線**。

若要在 AWS Glue for Spark AWS 中使用 Glue MongoDB 連線，請在連線方法呼叫中提供 `connectionName`選項。或者，您可以遵循 中的步驟[在 ETL 任務中使用 MongoDB 連線](integrate-with-mongo-db.md)，將連線與 Glue Data Catalog AWS 搭配使用。

## 使用 Glue 連線從 MongoDB AWS 讀取
<a name="aws-glue-programming-etl-connect-mongodb-read"></a>

**先決條件：**
+ 您想要讀取的 MongoDB 集合。您將需要集合的識別資訊。

  MongoDB 集合由資料庫名稱與集合名稱 *mongodbName*、*mongodbCollection* 識別。
+ 設定為提供身分驗證資訊的 AWS Glue MongoDB 連線。完成上一個程序*設定連至 MongoDB 的連線*的步驟，以設定驗證資訊。您需要 Glue AWS 連線的名稱 *connectionName*。

例如：

```
mongodb_read = glueContext.create_dynamic_frame.from_options(
    connection_type="mongodb",
    connection_options={
        "connectionName": "connectionName",
        "database": "mongodbName",
        "collection": "mongodbCollection",
        "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id",
        "disableUpdateUri": "false",
    }
)
```

## 寫入 MongoDB 資料表
<a name="aws-glue-programming-etl-connect-mongodb-write"></a>

此範例會從現有的 DynamicFrame *dynamicFrame* 將資訊寫入 MongoDB。

**先決條件：**
+ 您想要寫入的 MongoDB 集合。您將需要集合的識別資訊。

  MongoDB 集合由資料庫名稱與集合名稱 *mongodbName*、*mongodbCollection* 識別。
+ 設定為提供身分驗證資訊的 AWS Glue MongoDB 連線。完成上一個程序*設定連至 MongoDB 的連線*的步驟，以設定驗證資訊。您需要 Glue AWS 連線的名稱 *connectionName*。

例如：

```
glueContext.write_dynamic_frame.from_options(
    frame=dynamicFrame,
    connection_type="mongodb",
    connection_options={
        "connectionName": "connectionName",
        "database": "mongodbName",
        "collection": "mongodbCollection",
        "disableUpdateUri": "false",
        "retryWrites": "false", 
    },
)
```

## 讀取和寫入 MongoDB 資料表
<a name="aws-glue-programming-etl-connect-mongodb-read-write"></a>

此範例會從現有的 DynamicFrame *dynamicFrame* 將資訊寫入 MongoDB。

**先決條件：**
+ 您想要讀取的 MongoDB 集合。您將需要集合的識別資訊。

  您想要寫入的 MongoDB 集合。您將需要集合的識別資訊。

  MongoDB 集合由資料庫名稱與集合名稱 *mongodbName*、*mongodbCollection* 識別。
+ MongoDB 驗證資訊 *mongodbUser* 和 *mongodbPassword*。

例如：

------
#### [ Python ]

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017"
mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017"
write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_mongo_options = {
    "uri": mongo_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "username": "mongodbUsername",
    "password": "mongodbPassword",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"}

ssl_mongo_options = {
    "uri": mongo_ssl_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "ssl": "true",
    "ssl.domain_match": "false"
}

write_mongo_options = {
    "uri": write_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "username": "mongodbUsername",
    "password": "mongodbPassword",
}

# Get DynamicFrame from MongoDB
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                              connection_options=read_mongo_options)

# Write DynamicFrame to MongoDB
glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options)

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val defaultJsonOption = jsonOptions(DEFAULT_URI)
  lazy val writeJsonOption = jsonOptions(WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from MongoDB
    val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame()

    // Write DynamicFrame to MongoDB
    glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"mongodbName",
         |"collection":"mongodbCollection",
         |"username": "mongodbUsername",
         |"password": "mongodbPassword",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

------

## MongoDB 連線選項參考
<a name="aws-glue-programming-etl-connect-mongodb"></a>

指定與 MongoDB 的連線。來源連線和接收器連線的連線選項不同。

以下連線屬性可在來源連線和接收器連線之間共用：
+ `connectionName`：用於讀取/寫入。設定為向您的連線方法提供身分驗證和聯網資訊的 AWS Glue MongoDB 連線名稱。如上一節所述設定 AWS Glue 連線時，[設定 MongoDB 連線](#aws-glue-programming-etl-connect-mongodb-configure)提供 `connectionName`將取代提供 `"uri"`、 `"username"`和 `"password"`連線選項的需求。
+ `"uri"`：(必要) 讀取的 MongoDB 主機，格式為 `mongodb://<host>:<port>`。用於 AWS Glue 4.0 之前的 AWS Glue 版本。
+ `"connection.uri"`：(必要) 讀取的 MongoDB 主機，格式為 `mongodb://<host>:<port>`。在 AWS Glue 4.0 和更新版本中使用。
+ `"username"`：(必要) MongoDB 使用者名稱。
+ `"password"`：(必要) MongoDB 密碼。
+ `"database"`：(必要) 讀取的 MongoDB 資料庫。在任務指令碼中呼叫 `glue_context.create_dynamic_frame_from_catalog` 時，也可在 `additional_options` 中傳遞此選項。
+ `"collection"`：(必要) 讀取的 MongoDB 集合。在任務指令碼中呼叫 `glue_context.create_dynamic_frame_from_catalog` 時，也可在 `additional_options` 中傳遞此選項。

### "connectionType": "mongodb" as Source
<a name="etl-connect-mongodb-as-source"></a>

使用下列有 `"connectionType": "mongodb"` 的連線選項作為來源：
+ `"ssl"`：(選用) 如果 `true`，則啟用 SSL 連線。預設值為 `false`。
+ `"ssl.domain_match"`：(選用) 如果 `true` 和 `ssl` 為 `true`，則執行網域符合檢查。預設值為 `true`。
+ `"batchSize"`：(選用)：每個批次傳回的文件數目，於內部批次的游標內使用。
+ `"partitioner"`：(選用)：從 MongoDB 讀取輸入資料的分割區類別名稱。連接器提供下列分割區：
  + `MongoDefaultPartitioner` （預設） (Glue AWS 4.0 不支援）
  + `MongoSamplePartitioner` （需要 MongoDB 3.2 或更新版本） (Glue AWS 4.0 不支援）
  + `MongoShardedPartitioner` (Glue 4.0 AWS 不支援）
  + `MongoSplitVectorPartitioner` (Glue 4.0 AWS 不支援）
  + `MongoPaginateByCountPartitioner` (Glue 4.0 AWS 不支援）
  + `MongoPaginateBySizePartitioner` (Glue 4.0 AWS 不支援）
  + `com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner`
+ `"partitionerOptions"` (選用)：指定分割區的選項。每個分割區都支援下列選項：
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`: `partitionKey`, `partitionSizeMB`
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`: `partitionKey`, `partitionSizeMB`

  如需有關這些選項的詳細資訊，請參閱 MongoDB 文件中的[分割區組態](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)。

### "connectionType": "mongodb" as Sink
<a name="etl-connect-mongodb-as-sink"></a>

使用下列有 `"connectionType": "mongodb"` 的連線選項作為接收器：
+ `"ssl"`：(選用) 如果 `true`，則啟用 SSL 連線。預設值為 `false`。
+ `"ssl.domain_match"`：(選用) 如果 `true` 和 `ssl` 為 `true`，則執行網域符合檢查。預設值為 `true`。
+ `"extendedBsonTypes"`：(選用) 如果 `true`，則在寫入資料至 MongoDB 時允許延伸的 BSON 類型。預設值為 `true`。
+ `"replaceDocument"`：(選用) 如果 `true`，則在儲存包含 `_id` 欄位的資料集時取代整個文件。若為 `false`，則文件中僅有與資料集中欄位相符的欄位會更新。預設值為 `true`。
+ `"maxBatchSize"`：(選用)：儲存資料時大量操作的批次大小上限。預設為 512。
+ `"retryWrites"`：（選用）：如果 AWS Glue 遇到網路錯誤，則一次自動重試特定寫入操作。