

# MongoDB 接続
<a name="aws-glue-programming-etl-connect-mongodb-home"></a>

AWS Glue for Spark を使用して、AWS Glue 4.0 以降のバージョンの MongoDB および MongoDB Atlas のテーブルからの読み取りとテーブルへの書き込みを行うことができます。AWS Glue 接続を介して AWS Secrets Manager で保存されているユーザー名およびパスワードの認証情報を使用して MongoDB に接続できます。

MongoDB の詳細については、[MongoDB のドキュメント](https://www.mongodb.com/docs/)を参照してください。

## MongoDB 接続の設定
<a name="aws-glue-programming-etl-connect-mongodb-configure"></a>

AWS Glue から MongoDB に接続するには、MongoDB 認証情報、*mongodbUser* および *mongodbPass* が必要です。

AWS Glue から MongoDB に接続するには、いくつかの前提条件を満たす必要がある場合があります。
+ MongoDB インスタンスが Amazon VPC 内にある場合は、トラフィックがパブリックインターネットを経由することなく、AWS Glue ジョブが MongoDB インスタンスと通信できるように Amazon VPC を設定します。

  Amazon VPC で、AWS Glue がジョブの実行中に使用する **[VPC]**、**[サブネット]**、および **[セキュリティグループ]** を特定または作成します。さらに、MongoDB インスタンスとこの場所の間のネットワークトラフィックを許可するように Amazon VPC が設定されているようにする必要があります。ネットワークレイアウトに基づいて、セキュリティグループルール、ネットワーク ACL、NAT ゲートウェイ、およびピアリング接続の変更が必要になる場合があります。

その後、MongoDB で使用できるように AWS Glue を設定する作業に進むことができます。

**MongoDB に対する接続を設定するには:**

1. 必要に応じて、AWS Secrets Manager で、MongoDB 認証情報を使用してシークレットを作成します。Secrets Manager でシークレットを作成するには、AWS Secrets Manager ドキュメントの [AWS Secrets Manager シークレットを作成する](https://docs.aws.amazon.com//secretsmanager/latest/userguide/create_secret.html) にあるチュートリアルに従ってください。シークレットを作成したら、次のステップのためにシークレット名 *secretName* を保存しておきます。
   + **[key/value ペア]** を選択する際に、*mongodbUser* という値を持つキー `username` のペアを作成します。

     **[key/value ペア]** を選択する際に、*mongodbPass* という値を持つキー `password` のペアを作成します。

1. AWS Glue コンソールで、「[AWS Glue 接続の追加](console-connections.md)」にあるステップに従って接続を作成します。接続を作成したら、将来的に AWS Glue で使用するために、接続名 *connectionName* を維持します。
   + **[接続タイプ]** を選択する際には、**[MongoDB]** または **[MongoDB Atlas]** を選択します。
   + **[MongoDB URL]** または **[MongoDB Atlas URL]** を選択する場合は、MongoDB インスタンスのホスト名を入力します。

     MongoDB URL は、`mongodb://mongoHost:mongoPort/mongoDBname` の形式で指定されます。

     MongoDB Atlas URL は、`mongodb+srv://mongoHost/mongoDBname` の形式で指定されます。
   + Secrets Manager シークレットを作成することを選択した場合は、AWS Secrets Manager の **[認証情報タイプ]** を選択します。

     その後、**[AWS シークレット]** で *secretName* を入力します。
   + **[ユーザー名とパスワード]** を入力することを選択した場合は、*mongodbUser* および *mongodbPass* を入力します。

1. 次の状況では、追加の設定が必要になる場合があります。
   + 

     Amazon VPC の AWS でホストされている MongoDB インスタンスの場合
     + MongoDB セキュリティ認証情報を定義する AWS Glue 接続に、Amazon VPC 接続に関する情報を提供する必要があります。接続を作成または更新する際に、**[ネットワークオプション]** で **[VPC]**、**[サブネット]**、および **[セキュリティグループ]** を設定します。

AWS Glue MongoDB 接続を作成した後、接続メソッドを呼び出す前に次のアクションを実行する必要があります。
+ Secrets Manager シークレットを作成することを選択した場合は、AWS Glue ジョブに関連付けられた IAM ロールに *secretName* を読み取るための許可を付与します。
+ AWS Glue ジョブ設定で、**追加のネットワーク接続**として *connectionName* を指定します。

AWS Glue for Spark で AWS Glue MongoDB 接続を使用するには、接続メソッド呼び出しで `connectionName` オプションを指定します。あるいは、[ETL ジョブでの MongoDB 接続の操作](integrate-with-mongo-db.md) のステップに従って、AWS Glue データカタログと組み合わせて接続を使用することもできます。

## AWS Glue 接続を使用した MongoDB からの読み取り
<a name="aws-glue-programming-etl-connect-mongodb-read"></a>

**前提条件:** 
+ 読み取り元とする MongoDB コレクション。コレクションの識別情報が必要になります。

  MongoDB コレクションは、データベース名とコレクション名である *mongodbName* および *mongodbCollection* によって識別されます。
+ 認証情報を提供するように設定された AWS Glue MongoDB 接続。認証情報を設定するには、前の手順「*MongoDB に対する接続を設定するには*」のステップを実行します。AWS Glue 接続、*connectionName* の名前が必要になります。

例えば、次のようになります。

```
mongodb_read = glueContext.create_dynamic_frame.from_options(
    connection_type="mongodb",
    connection_options={
        "connectionName": "connectionName",
        "database": "mongodbName",
        "collection": "mongodbCollection",
        "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id",
        "disableUpdateUri": "false",
    }
)
```

## MongoDB テーブルへの書き込み
<a name="aws-glue-programming-etl-connect-mongodb-write"></a>

この例では、既存の DynamicFrame である *dynamicFrame* から MongoDB に情報を書き込みます。

**前提条件:** 
+ 書き込み先とする MongoDB コレクション。コレクションの識別情報が必要になります。

  MongoDB コレクションは、データベース名とコレクション名である *mongodbName* および *mongodbCollection* によって識別されます。
+ 認証情報を提供するように設定された AWS Glue MongoDB 接続。認証情報を設定するには、前の手順「*MongoDB に対する接続を設定するには*」のステップを実行します。AWS Glue 接続、*connectionName* の名前が必要になります。

例えば、次のようになります。

```
glueContext.write_dynamic_frame.from_options(
    frame=dynamicFrame,
    connection_type="mongodb",
    connection_options={
        "connectionName": "connectionName",
        "database": "mongodbName",
        "collection": "mongodbCollection",
        "disableUpdateUri": "false",
        "retryWrites": "false", 
    },
)
```

## MongoDB テーブルに対する読み書き
<a name="aws-glue-programming-etl-connect-mongodb-read-write"></a>

この例では、既存の DynamicFrame である *dynamicFrame* から MongoDB に情報を書き込みます。

**前提条件:** 
+ 読み取り元とする MongoDB コレクション。コレクションの識別情報が必要になります。

  書き込み先とする MongoDB コレクション。コレクションの識別情報が必要になります。

  MongoDB コレクションは、データベース名とコレクション名である *mongodbName* および *mongodbCollection* によって識別されます。
+ MongoDB 認証情報である *mongodbUser* および *mongodbPassword*。

例えば、次のようになります。

------
#### [ Python ]

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017"
mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017"
write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_mongo_options = {
    "uri": mongo_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "username": "mongodbUsername",
    "password": "mongodbPassword",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"}

ssl_mongo_options = {
    "uri": mongo_ssl_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "ssl": "true",
    "ssl.domain_match": "false"
}

write_mongo_options = {
    "uri": write_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "username": "mongodbUsername",
    "password": "mongodbPassword",
}

# Get DynamicFrame from MongoDB
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                              connection_options=read_mongo_options)

# Write DynamicFrame to MongoDB
glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options)

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val defaultJsonOption = jsonOptions(DEFAULT_URI)
  lazy val writeJsonOption = jsonOptions(WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from MongoDB
    val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame()

    // Write DynamicFrame to MongoDB
    glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"mongodbName",
         |"collection":"mongodbCollection",
         |"username": "mongodbUsername",
         |"password": "mongodbPassword",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

------

## MongoDB 接続オプションのリファレンス
<a name="aws-glue-programming-etl-connect-mongodb"></a>

MongoDB への接続を指定します。接続オプションは、ソース接続とシンク接続とで異なります。

これらの接続プロパティは、ソース接続とシンク接続の間で共有されます。
+ `connectionName` — 読み込み/書き込みに使用されます。認証およびネットワークの情報を接続方法に提供するように設定された AWS Glue MongoDB 接続の名前。前のセクション「[MongoDB 接続の設定](#aws-glue-programming-etl-connect-mongodb-configure)」で説明したように AWS Glue 接続が設定されている場合、`connectionName` を入力することで、`"uri"`、`"username"`、および `"password"` 接続オプションを入力する必要がなくなります。
+ `"uri"`: (必須) 読み込み元の MongoDB ホスト (形式: `mongodb://<host>:<port>`)。AWS Glue 4.0 より前のバージョンの AWS Glue で使用されます。
+ `"connection.uri"`: (必須) 読み込み元の MongoDB ホスト (形式: `mongodb://<host>:<port>`)。AWS Glue 4.0 以降のバージョンで使用されます。
+ `"username"`: (必須) MongoDB のユーザー名。
+ `"password"`: (必須) MongoDB のパスワード。
+ `"database"`: (必須) 読み込み元の MongoDB データベース。このオプションは、ジョブスクリプトで `glue_context.create_dynamic_frame_from_catalog` を呼び出す際に、`additional_options` を介して渡すことも可能です。
+ `"collection"`: (必須) 読み込み元の MongoDB コレクション。このオプションは、ジョブスクリプトで `glue_context.create_dynamic_frame_from_catalog` を呼び出す際に、`additional_options` を介して渡すことも可能です。

### "connectionType": "mongodb" ソースとする
<a name="etl-connect-mongodb-as-source"></a>

`"connectionType": "mongodb"` をソースとして、次の接続オプションを使用します。
+ `"ssl"`: (オプション) `true` の場合、SSL 接続を開始します。デフォルトは `false` です。
+ `"ssl.domain_match"`: (任意) `true` と `ssl` が `true` の場合 、ドメイン一致チェックが実行されます。デフォルトは `true` です。
+ `"batchSize"`: (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。
+ `"partitioner"`: (オプション): MongoDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。
  + `MongoDefaultPartitioner` (デフォルト) (AWS Glue 4.0 ではサポートされていません)
  + `MongoSamplePartitioner` (MongoDB 3.2 以降が必要) (AWS Glue 4.0 ではサポートされていません)
  + `MongoShardedPartitioner` (AWS Glue 4.0 ではサポートされていません)
  + `MongoSplitVectorPartitioner` (AWS Glue 4.0 ではサポートされていません)
  + `MongoPaginateByCountPartitioner` (AWS Glue 4.0 ではサポートされていません)
  + `MongoPaginateBySizePartitioner` (AWS Glue 4.0 ではサポートされていません)
  + `com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner`
+ `"partitionerOptions"` (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`: `partitionKey`, `partitionSizeMB`
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`: `partitionKey`, `partitionSizeMB`

  これらのオプションの詳細については、MongoDB のドキュメントの「[Partitioner Configuration](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)」を参照してください。

### "connectionType": "mongodb" as Sink
<a name="etl-connect-mongodb-as-sink"></a>

`"connectionType": "mongodb"` をシンクとして、次の接続オプションを使用します。
+ `"ssl"`: (オプション) `true` の場合、SSL 接続を開始します。デフォルトは `false` です。
+ `"ssl.domain_match"`: (任意) `true` と `ssl` が `true` の場合 、ドメイン一致チェックが実行されます。デフォルトは `true` です。
+ `"extendedBsonTypes"`: (オプション) `true` が設定されている場合、MongoDB にデータを書き込む際に拡張 BSON 型を使用することを許可します。デフォルトは `true` です。
+ `"replaceDocument"`: (オプション) `true` の場合、`_id` フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。`false` の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルトは `true` です。
+ `"maxBatchSize"`: (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。
+ `"retryWrites"`: (オプション): AWS Glue でネットワークエラーが発生した場合、特定の書き込みオペレーションを 1 回自動的に再試行します。