

# MongoDB 연결
<a name="aws-glue-programming-etl-connect-mongodb-home"></a>

AWS Glue for Spark를 사용하여 AWS Glue 4.0 이상 버전에서 MongoDB와 MongoDB Atlas의 테이블에서 읽고 쓸 수 있습니다. AWS Glue 연결을 통해 AWS Secrets Manager에 저장된 사용자 이름 및 암호 보안 인증 정보를 사용하여 MongoDB에 연결할 수 있습니다.

MongoDB 대한 자세한 내용은 [MongoDB 설명서](https://www.mongodb.com/docs/)를 참조하십시오.

## MongoDB 연결 구성
<a name="aws-glue-programming-etl-connect-mongodb-configure"></a>

AWS Glue에서 MongoDB에 연결하려면 MongoDB 보안 인증 정보와 *mongodbUser*, *mongodbPass*가 필요합니다.

AWS Glue에서 MongoDB에 연결하려면 몇 가지 전제 조건이 필요할 수 있습니다.
+ MongoDB 인스턴스가 Amazon VPC에 있는 경우, 퍼블릭 인터넷을 통과하는 트래픽 없이 AWS Glue 작업이 MongoDB 인스턴스와 통신할 수 있도록 Amazon VPC를 구성하십시오.

  Amazon VPC에서 AWS Glue가 작업을 실행하는 동안 사용할 **VPC**, **서브넷** 및 **보안 그룹**을 식별하거나 생성합니다. 또한 MongoDB 인스턴스와 이 위치 간의 네트워크 트래픽을 허용하도록 Amazon VPC를 구성해야 합니다. 네트워크 레이아웃에 따라 보안 그룹 규칙, 네트워크 ACL, NAT 게이트웨이 및 피어링 연결을 변경해야 할 수도 있습니다.

그런 다음 MongoDB와 함께 사용할 수 있도록 AWS Glue를 구성할 수 있습니다.

**MongoDB 연결 구성 방법:**

1. 대안으로 AWS Secrets Manager에서 MongoDB 보안 인증을 사용하여 보안 암호를 생성할 수 있습니다. Secrets Manager에서 보안 암호를 생성하려면 AWS Secrets Manager 설명서의 [Create an AWS Secrets Manager secret](https://docs.aws.amazon.com//secretsmanager/latest/userguide/create_secret.html)에서 제공하는 자습서를 따릅니다. 보안 암호를 생성한 후에는 다음 단계를 위해 보안 암호 이름, *secretName*을 유지합니다.
   + **키/값 페어**를 선택하면 값 *mongodbUser*이 포함된 키 `username`에 대한 페어를 생성합니다.

     **키/값 페어**를 선택하면 값 *mongodbPass*가 포함된 키 `password`에 대한 페어를 생성합니다.

1. AWS Glue 콘솔에서 [AWS Glue 연결 추가](console-connections.md)의 단계에 따라 연결을 생성합니다. 연결을 생성한 후에는 AWS Glue에서 이용하기 위해 연결 이름 *connectionName*을 유지합니다.
   + **연결 유형**을 선택할 때에는 **MongoDB** 또는 **MongoDB Atlas**를 선택합니다.
   + **MongoDB URL** 또는 **MongoDB Atlas URL**을 선택할 때에는 MongoDB 인스턴스의 호스트 이름을 제공하십시오.

     MongoDB URL은 `mongodb://mongoHost:mongoPort/mongoDBname` 형식으로 제공됩니다.

     MongoDB Atlas URL은 `mongodb+srv://mongoHost/mongoDBname` 형식으로 제공됩니다.
   + Secrets Manager 암호를 생성하기로 선택한 경우 AWS Secrets Manager **보안 인증 정보 유형**을 선택합니다.

     그런 다음 **AWS 암호**에 *secretName*을 입력합니다.
   + **사용자 이름과 비밀번호**를 제공하기로 선택한 경우 *mongodbUser*와 *mongodbPass*를 제공하십시오.

1. 다음과 같은 상황에서는 추가 구성이 필요할 수도 있습니다.
   + 

     Amazon VPC에서 AWS에 호스팅된 MongoDB 인스턴스의 경우
     + MongoDB 보안 보안 인증 정보를 정의하는 AWS Glue 연결에 Amazon VPC 연결 정보를 제공해야 합니다. 연결을 만들거나 업데이트할 때 **네트워크 옵션**에서 **VPC**, **서브넷** 및 **보안 그룹**을 설정합니다.

AWS Glue MongoDB 연결을 생성한 후에는 연결 방법을 호출하기 전에 다음 조치를 수행해야 합니다.
+ Secrets Manager 암호을 생성하기로 선택한 경우, AWS Glue 작업과 연결된 IAM 역할에 *secretName*을 읽을 수 있는 권한을 부여하십시오.
+ AWS Glue 작업 구성에서 **추가 네트워크 연결**로 *connectionName*을 제공합니다.

AWS Glue for Spark에서 AWS Glue MongoDB 연결을 사용하려면 연결 방법 호출에서 `connectionName` 옵션을 제공하십시오. 또는, [ETL 작업의 MondoDB 연결 작업](integrate-with-mongo-db.md)의 단계에 따라 AWS Glue 데이터 카탈로그와 함께 연결을 사용할 수도 있습니다.

## Glue AWS 연결을 사용하여 MongoDB에서 읽기
<a name="aws-glue-programming-etl-connect-mongodb-read"></a>

**사전 조건**: 
+ 읽으려는 MongoDB 컬렉션. 컬렉션에 대한 식별 정보가 필요합니다.

  MongoDB 컬렉션은 데이터베이스 이름 *mongodbName* 및 컬렉션 이름 *mongodbCollection*으로 식별됩니다.
+ 인증 정보를 제공하도록 구성된 AWS Glue MongoDB 연결입니다. 인증 정보를 구성하려면 앞 절차인 *MongoDB에 대한 연결을 구성하는 방법*의 단계를 완료하십시오. AWS Glue 연결의 이름인 *connectionName*이 필요합니다.

예제: 

```
mongodb_read = glueContext.create_dynamic_frame.from_options(
    connection_type="mongodb",
    connection_options={
        "connectionName": "connectionName",
        "database": "mongodbName",
        "collection": "mongodbCollection",
        "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id",
        "disableUpdateUri": "false",
    }
)
```

## MongoDB 테이블에 쓰기
<a name="aws-glue-programming-etl-connect-mongodb-write"></a>

이 예제에서는 기존 DynamicFrame, *dynamicFrame*의 정보를 MongoDB에 씁니다.

**사전 조건**: 
+ 쓰려는 MongoDB 컬렉션. 컬렉션에 대한 식별 정보가 필요합니다.

  MongoDB 컬렉션은 데이터베이스 이름 *mongodbName* 및 컬렉션 이름 *mongodbCollection*으로 식별됩니다.
+ 인증 정보를 제공하도록 구성된 AWS Glue MongoDB 연결입니다. 인증 정보를 구성하려면 앞 절차인 *MongoDB에 대한 연결을 구성하는 방법*의 단계를 완료하십시오. AWS Glue 연결의 이름인 *connectionName*이 필요합니다.

예제: 

```
glueContext.write_dynamic_frame.from_options(
    frame=dynamicFrame,
    connection_type="mongodb",
    connection_options={
        "connectionName": "connectionName",
        "database": "mongodbName",
        "collection": "mongodbCollection",
        "disableUpdateUri": "false",
        "retryWrites": "false", 
    },
)
```

## MongoDB에서 테이블에 쓰고 쓰는 중
<a name="aws-glue-programming-etl-connect-mongodb-read-write"></a>

이 예제에서는 기존 DynamicFrame, *dynamicFrame*의 정보를 MongoDB에 씁니다.

**사전 조건**: 
+ 읽으려는 MongoDB 컬렉션. 컬렉션에 대한 식별 정보가 필요합니다.

  쓰려는 MongoDB 컬렉션. 컬렉션에 대한 식별 정보가 필요합니다.

  MongoDB 컬렉션은 데이터베이스 이름 *mongodbName* 및 컬렉션 이름 *mongodbCollection*으로 식별됩니다.
+ MongoDB 인증 정보, *mongodbUser*, *mongodbPassword*.

예제: 

------
#### [ Python ]

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017"
mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017"
write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_mongo_options = {
    "uri": mongo_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "username": "mongodbUsername",
    "password": "mongodbPassword",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"}

ssl_mongo_options = {
    "uri": mongo_ssl_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "ssl": "true",
    "ssl.domain_match": "false"
}

write_mongo_options = {
    "uri": write_uri,
    "database": "mongodbName",
    "collection": "mongodbCollection",
    "username": "mongodbUsername",
    "password": "mongodbPassword",
}

# Get DynamicFrame from MongoDB
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                              connection_options=read_mongo_options)

# Write DynamicFrame to MongoDB
glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options)

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val defaultJsonOption = jsonOptions(DEFAULT_URI)
  lazy val writeJsonOption = jsonOptions(WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from MongoDB
    val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame()

    // Write DynamicFrame to MongoDB
    glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"mongodbName",
         |"collection":"mongodbCollection",
         |"username": "mongodbUsername",
         |"password": "mongodbPassword",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

------

## MongoDB 연결 옵션 참조
<a name="aws-glue-programming-etl-connect-mongodb"></a>

MongoDB에 대한 연결을 지정합니다. 연결 옵션은 소스 연결 및 싱크 연결에 따라 다릅니다.

다음과 같은 연결 속성은 소스 연결과 싱크 연결 간에 공유됩니다.
+ `connectionName` - 읽기/쓰기에 사용됩니다. 연결 방법에 인증 및 네트워킹 정보를 제공하도록 구성된 AWS Glue MongoDB 연결의 이름입니다. [MongoDB 연결 구성](#aws-glue-programming-etl-connect-mongodb-configure)에서 설명한 대로 AWS Glue 연결을 구성하면 `connectionName`을 제공했을 때`"uri"`, `"username"` 및 `"password"` 연결 옵션을 제공할 필요가 없어집니다.
+ `"uri"`: (필수 사항) 읽을 소스 MongoDB 호스트로, `mongodb://<host>:<port>` 형식입니다. AWS Glue 4.0 이전의 AWS Glue 버전에서 사용됩니다.
+ `"connection.uri"`: (필수 사항) 읽을 소스 MongoDB 호스트로, `mongodb://<host>:<port>` 형식입니다. AWS Glue 4.0 이상 버전에서 사용됩니다.
+ `"username"`: (필수 사항) MongoDB 사용자 이름입니다.
+ `"password"`: (필수 사항) MongoDB 암호입니다.
+ `"database"`: (필수 사항) 읽을 MongoDB 데이터베이스입니다. 이 옵션은 작업 스크립트에서 `glue_context.create_dynamic_frame_from_catalog`를 호출할 때 `additional_options`로 전달할 수도 있습니다.
+ `"collection"`: (필수 사항) 읽을 MongoDB 컬렉션입니다. 이 옵션은 작업 스크립트에서 `glue_context.create_dynamic_frame_from_catalog`를 호출할 때 `additional_options`로 전달할 수도 있습니다.

### 소스로서의 "connectionType": "mongodb"
<a name="etl-connect-mongodb-as-source"></a>

소스로서의 `"connectionType": "mongodb"`에는 다음 연결 옵션을 사용합니다.
+ `"ssl"`: (선택 사항) `true`이면 SSL 연결을 시작합니다. 기본값은 `false`입니다.
+ `"ssl.domain_match"`: (선택 사항) `true` 및 `ssl`이 `true`이면 도메인 일치 검사가 수행됩니다. 기본값은 `true`입니다.
+ `"batchSize"`: (선택 사항) 내부 배치의 커서 내에서 사용되는 배치당 반환할 문서 수입니다.
+ `"partitioner"`: (선택 사항) MongoDB에서 입력 데이터를 읽는 파티셔너의 클래스 이름입니다. 커넥터는 다음 파티셔너를 제공합니다.
  + `MongoDefaultPartitioner`(기본값)(AWS Glue 4.0에서는 지원되지 않음)
  + `MongoSamplePartitioner`(MongoDB 3.2 이상 필요)(AWS Glue 4.0에서는 지원되지 않음)
  + `MongoShardedPartitioner`(AWS Glue 4.0에서는 지원되지 않음)
  + `MongoSplitVectorPartitioner`(AWS Glue 4.0에서는 지원되지 않음)
  + `MongoPaginateByCountPartitioner`(AWS Glue 4.0에서는 지원되지 않음)
  + `MongoPaginateBySizePartitioner`(AWS Glue 4.0에서는 지원되지 않음)
  + `com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner`
+ `"partitionerOptions"`: (선택 사항) 지정된 파티셔너에 대한 옵션입니다. 각 파티셔너에 대해 다음 옵션이 지원됩니다.
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`: `partitionKey`, `partitionSizeMB`
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`: `partitionKey`, `partitionSizeMB`

  이러한 옵션에 대한 자세한 내용은 MongoDB 설명서의 [파티셔너 구성](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)을 참조하십시오.

### 싱크로서의 "connectionType": "mongodb"
<a name="etl-connect-mongodb-as-sink"></a>

싱크로서의 `"connectionType": "mongodb"`에는 다음 연결 옵션을 사용합니다.
+ `"ssl"`: (선택 사항) `true`이면 SSL 연결을 시작합니다. 기본값은 `false`입니다.
+ `"ssl.domain_match"`: (선택 사항) `true` 및 `ssl`이 `true`이면 도메인 일치 검사가 수행됩니다. 기본값은 `true`입니다.
+ `"extendedBsonTypes"`: (선택 사항) `true`이면 데이터를 MongoDB에 쓸 때 확장 BSON 유형을 허용합니다. 기본값은 `true`입니다.
+ `"replaceDocument"`: (선택 사항) `true`이면 `_id` 필드가 포함된 데이터 세트를 저장할 때 전체 문서를 대체합니다. `false`이면 데이터 세트의 필드와 일치하는 문서의 필드만 업데이트됩니다. 기본값은 `true`입니다.
+ `"maxBatchSize"`: (선택 사항) 데이터를 저장할 때 대량 작업의 최대 배치 크기입니다. 기본값은 512입니다.
+ `"retryWrites"`: (선택 사항): AWS Glue에서 네트워크 오류가 발생하는 경우 특정 쓰기 작업을 한 번 자동으로 재시도합니다.