

# AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션
<a name="aws-glue-programming-etl-connect"></a>

AWS Glue for Spark에서 다양한 PySpark와 Scala 메서드 및 변환은 `connectionType` 파라미터를 사용하여 연결 유형을 지정합니다. `connectionOptions` 또는 `options` 파라미터를 사용하여 연결 옵션을 지정합니다.

`connectionType` 파라미터는 다음 표에 표시된 값을 사용할 수 있습니다. 각 유형에 대한 연관된 `connectionOptions`(또는 `options`) 파라미터 값은 다음 섹션에 설명되어 있습니다. 별도의 언급이 없으면 이 파라미터는 연결이 소스 또는 싱크로 사용될 때 적용됩니다.

연결 옵션을 설정하고 사용하는 방법을 보여주는 샘플 코드는 각 연결 유형별 홈페이지를 참조하세요.


| `connectionType` | 연결 대상 | 
| --- | --- | 
| [dynamodb](aws-glue-programming-etl-connect-dynamodb-home.md) | [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/) 데이터베이스 | 
| [kinesis](aws-glue-programming-etl-connect-kinesis-home.md) | [Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) | 
| [s3](aws-glue-programming-etl-connect-s3-home.md) | [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/dev/) | 
| [documentdb](aws-glue-programming-etl-connect-documentdb-home.md#aws-glue-programming-etl-connect-documentdb) | [Amazon DocumentDB(MongoDB 호환)](https://docs.aws.amazon.com/documentdb/latest/developerguide/) 데이터베이스 | 
| [OpenSearch](aws-glue-programming-etl-connect-opensearch-home.md) | [Amazon OpenSearch Service](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/). | 
| [redshift](aws-glue-programming-etl-connect-redshift-home.md) | [Amazon Redshift](https://aws.amazon.com/redshift/) 데이터베이스 | 
| [kafka](aws-glue-programming-etl-connect-kafka-home.md) |  [Kafka](https://kafka.apache.org/) 또는 [Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) | 
| [azurecosmos](aws-glue-programming-etl-connect-azurecosmos-home.md) | NoSQL용 Azure Cosmos. | 
| [azuresql](aws-glue-programming-etl-connect-azuresql-home.md) | Azure SQL. | 
| [bigquery](aws-glue-programming-etl-connect-bigquery-home.md) | Google BigQuery. | 
| [mongodb](aws-glue-programming-etl-connect-mongodb-home.md) | MongoDB Atlas를 포함한 [MongoDB](https://www.mongodb.com/what-is-mongodb) 데이터베이스. | 
| [sqlserver](aws-glue-programming-etl-connect-jdbc-home.md) |  Microsoft SQL Server 데이터베이스([JDBC 연결](aws-glue-programming-etl-connect-jdbc-home.md) 참조) | 
| [mysql](aws-glue-programming-etl-connect-jdbc-home.md) | [MySQL](https://www.mysql.com/) 데이터베이스([JDBC 연결](aws-glue-programming-etl-connect-jdbc-home.md) 참조) | 
| [oracle](aws-glue-programming-etl-connect-jdbc-home.md) | [Oracle](https://www.oracle.com/database/) Database([JDBC 연결](aws-glue-programming-etl-connect-jdbc-home.md) 참조) | 
| [postgresql](aws-glue-programming-etl-connect-jdbc-home.md) |  [PostgreSQL](https://www.postgresql.org/) 데이터베이스([JDBC 연결](aws-glue-programming-etl-connect-jdbc-home.md) 참조) | 
| [saphana](aws-glue-programming-etl-connect-saphana-home.md) | SAP HANA. | 
| [snowflake](aws-glue-programming-etl-connect-snowflake-home.md) | [Snowflake](https://www.snowflake.com/) 데이터 레이크 | 
| [Teradata](aws-glue-programming-etl-connect-teradata-home.md) | Teradata Vantage. | 
| [vertica](aws-glue-programming-etl-connect-vertica-home.md) | Vertica. | 
| [custom.\*](#aws-glue-programming-etl-connect-market) | Spark, Athena 또는 JDBC 데이터 원본([사용자 정의 및 AWS Marketplace 연결 유형 값](#aws-glue-programming-etl-connect-market) 참조)  | 
| [marketplace.\*](#aws-glue-programming-etl-connect-market) | Spark, Athena 또는 JDBC 데이터 원본([사용자 정의 및 AWS Marketplace 연결 유형 값](#aws-glue-programming-etl-connect-market) 참조)  | 

## Spark용 AWS Glue 5.0의 ETL에 대한 DataFrame 옵션
<a name="aws-glue-programming-etl-connect-dataframe"></a>

DataFrame은 이름이 지정된 열로 구성되는 데이터세트이며, 테이블과 비슷한 기능적 스타일(맵/줄임/필터 등) 작업과 SQL 작업(선택, 계획, 집계)을 지원합니다.

Glue에서 지원하는 데이터 소스에 대한 DataFrame을 생성하려면 다음이 필요합니다.
+ 데이터 소스 커넥터 `ClassName`
+ 데이터 소스 연결 `Options`

마찬가지로 Glue에서 지원하는 데이터 싱크에 DataFrame을 작성하려면 동일한 작업이 필요합니다.
+ 데이터 싱크 커넥터 `ClassName`
+ 데이터 싱크 연결 `Options`

작업 북마크와 같은 AWS Glue 기능 및 `connectionName`과 같은 DynamicFrame 옵션은 DataFrame에서 지원되지 않습니다. DataFrame 및 지원되는 작업에 대한 자세한 내용은 [DataFrame](https://spark.apache.org/docs/3.5.2/api/python/reference/pyspark.sql/dataframe.html)용 Spark 설명서를 참조하세요.

### 커넥터 ClassName 지정
<a name="aws-glue-programming-etl-connect-dataframe-classname"></a>

데이터 소스/싱크의 `ClassName`을 지정하려면 `.format` 옵션을 사용하여 데이터 소스/싱크를 정의하는 해당 커넥터 `ClassName`을 제공합니다.

**JDBC 커넥터**  
JDBC 커넥터의 경우 `.format` 옵션의 값으로 `jdbc`를 지정하고 `driver` 옵션에 JDBC 드라이버 `ClassName`을 제공합니다.

```
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")...

df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
```

다음 표에는 DataFrames용 AWS Glue에서 지원되는 데이터 소스의 JDBC 드라이버 `ClassName`이 나열되어 있습니다.

| 데이터 소스 | Driver ClassName | 
| --- |--- |
| PostgreSQL | org.postgresql.Driver | 
| Oracle | oracle.jdbc.driver.OracleDriver | 
| SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
| MySQL | com.mysql.jdbc.Driver | 
| SAPHana | com.sap.db.jdbc.Driver | 
| Teradata | com.teradata.jdbc.TeraDriver | 

**Spark 커넥터**  
Spark 커넥터의 경우 커넥터의 `ClassName`을 `.format` 옵션 값으로 지정합니다.

```
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")...

df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
```

다음 표에는 DataFrames용 AWS Glue에서 지원되는 데이터 소스의 Spark 커넥터 `ClassName`이 나열되어 있습니다.

| 데이터 소스 | ClassName | 
| --- |--- |
| MongoDB/DocumentDB | glue.spark.mongodb | 
| Redshift | io.github.spark\_redshift\_community.spark.redshift | 
| AzureCosmos | cosmos.oltp | 
| AzureSQL | com.microsoft.sqlserver.jdbc.spark | 
| BigQuery | com.google.cloud.spark.bigquery | 
| OpenSearch | org.opensearch.spark.sql | 
| Snowflake | net.snowflake.spark.snowflake | 
| Vertica | com.vertica.spark.datasource.VerticaSource | 

### 연결 옵션 지정
<a name="aws-glue-programming-etl-connect-dataframe-connection-options"></a>

데이터 소스/싱크에 대한 연결의 `Options`를 지정하려면 `.option(<KEY>, <VALUE>)`을 사용하여 개별 옵션을 제공하거나 `.options(<MAP>)`를 사용하여 여러 옵션을 키-값 맵으로 제공합니다.

각 데이터 소스/싱크는 자체 연결 `Options` 세트를 지원합니다. 사용 가능한 `Options`에 대한 자세한 내용은 다음 표에 나열된 특정 데이터 소스/싱크 Spark 커넥터의 퍼블릭 설명서를 참조하세요.
+ [JDBC](https://spark.apache.org/docs/3.5.2/sql-data-sources-jdbc.html)
+ [MongoDB/DocumentDB](https://www.mongodb.com/docs/spark-connector/v10.4/)
+ [Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/spark-redshift-connector.html)
+ [AzureCosmos](https://github.com/Azure/azure-cosmosdb-spark)
+ [AzureSQL](https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16)
+ [BigQuery](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example)
+ [OpenSearch](https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#apache-spark)
+ [Snowflake](https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector)
+ [Vertica](https://github.com/vertica/spark-connector)

### 예제
<a name="aws-glue-programming-etl-connect-dataframe-examples"></a>

다음 예에서는 PostgreSQL에서 읽고 SnowFlake에 씁니다.

**Python**  
예제:

```
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

dataSourceClassName = "jdbc"
dataSourceOptions = {
  "driver": "org.postgresql.Driver",
  "url": "<url>",
  "user": "<user>",
  "password": "<password>",
  "dbtable": "<dbtable>",
}

dataframe = spark.read.format(className).options(**options).load()

dataSinkClassName = "net.snowflake.spark.snowflake"
dataSinkOptions = {
  "sfUrl": "<url>",
  "sfUsername": "<username>",
  "sfPassword": "<password>",
  "sfDatabase" -> "<database>",                              
  "sfSchema" -> "<schema>",                       
  "sfWarehouse" -> "<warehouse>"  
}

dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
```

**Scala**  
예제:

```
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

val dataSourceClassName = "jdbc"
val dataSourceOptions = Map(
  "driver" -> "org.postgresql.Driver",
  "url" -> "<url>",
  "user" -> "<user>",
  "password" -> "<password>",
  "dbtable" -> "<dbtable>"
)

val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load()

val dataSinkClassName = "net.snowflake.spark.snowflake"
val dataSinkOptions = Map(
  "sfUrl" -> "<url>",
  "sfUsername" -> "<username>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database>",
  "sfSchema" -> "<schema>",
  "sfWarehouse" -> "<warehouse>"
)

dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
```

## 사용자 정의 및 AWS Marketplace 연결 유형 값
<a name="aws-glue-programming-etl-connect-market"></a>

여기에는 다음이 포함됩니다.
+ `"connectionType": "marketplace.athena"`: Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다.
+ `"connectionType": "marketplace.spark"`: Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다.
+ `"connectionType": "marketplace.jdbc"`: JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다.
+ `"connectionType": "custom.athena"`: Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.
+ `"connectionType": "custom.spark"`: Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.
+ `"connectionType": "custom.jdbc"`: JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.

### custom.jdbc 또는 marketplace.jdbc 유형에 대한 연결 옵션
<a name="marketplace-jdbc-connect-options"></a>
+ `className` – 문자열, 필수. JDBC 드라이버의 정규화된 Java 클래스 이름입니다(예: `com.mysql.cj.jdbc.Driver`). 사용자 지정 AWS Marketplace 또는 JDBC 커넥터를 사용하는 경우 이 옵션을 지정해야 합니다. 이 옵션을 생략하면, 특히 파라미터화된 JDBC URL을 사용하는 경우 작업이 `No suitable driver` 오류와 함께 실패할 수 있습니다.
+ `connectionName` - 문자열, 필수, 커넥터와 연결된 연결 이름입니다.
+ `url`- 문자열, 필수, 데이터 원본에 대한 연결을 구축하는 데 사용되는 자리 표시자(`${}`)가 있는 JDBC URL입니다. 자리 표시자 `${secretKey}`는 AWS Secrets Manager에서 같은 이름의 보안 암호로 대체됩니다. URL 구성에 대한 자세한 내용은 데이터 스토어 설명서를 참조하세요.
+ `secretId` 또는 `user/password` – 문자열, 필수, URL에 대한 자격 증명을 검색하는 데 사용됩니다.
+ `dbTable` 또는 `query` - 문자열, 필수, 데이터를 가져올 테이블 또는 SQL 쿼리입니다. `dbTable` 또는 `query`을 지정할 수 있지만 둘 다 함께 지정할 수는 없습니다.
+ `partitionColumn`- 문자열, 선택 사항, 파티셔닝에 사용되는 정수 열의 이름입니다. 이 옵션은 `lowerBound`, `upperBound` 및 `numPartitions`에 포함되는 경우에만 작동합니다. 이 옵션은 Spark SQL JDBC 리더에서와 같은 방식으로 작동합니다. 자세한 내용은 *Apache Spark SQL, DataFrames and Datasets Guide*의 [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)를 참조하세요.

  `lowerBound` 및 `upperBound` 값은 테이블의 행을 필터링하는 것이 아니라 파티션 스트라이드를 결정하는 데 사용됩니다. 테이블의 모든 행이 분할되어 반환됩니다.
**참고**  
테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된 분할 조건에서 작동하는지 확인해야 합니다. 예:   
쿼리 포맷이 `"SELECT col1 FROM table1"`이면 파티션 열을 사용하는 쿼리 끝에 `WHERE` 절을 추가하여 쿼리를 테스트합니다.
쿼리 포맷이 "`SELECT col1 FROM table1 WHERE col2=val"`이면 `AND`와 파티션 열을 사용하는 표현식으로 `WHERE` 절을 확장하여 쿼리를 테스트합니다.
+ `lowerBound` - 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는 `partitionColumn`의 최소값입니다.
+ `upperBound` - 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는 `partitionColumn`의 최대값입니다.
+ `numPartitions`- 정수, 선택 사항, 파티션 수입니다. 이 값은 `lowerBound`(포함) 및 `upperBound`(배타)와 함께 `partitionColumn`을 분할하는 데 사용되는 생성된 `WHERE` 절 표현에 대한 파티션 스트라이드를 형성합니다.
**중요**  
파티션이 너무 많으면 외부 데이터베이스 시스템에 문제가 발생할 수 있으므로 파티션 수에 주의합니다.
+ `filterPredicate` - 문자열, 선택 사항, 소스에서 데이터를 필터링하기 위한 추가 조건 절입니다. 예: 

  ```
  BillingCity='Mountain View'
  ```

  *테이블* 이름 대신 *쿼리*를 사용하는 경우 쿼리가 지정된 `filterPredicate`에서 작동하는지 검증해야 합니다. 예: 
  + 쿼리 포맷이 `"SELECT col1 FROM table1"`이면 필터 조건자를 사용하는 쿼리 끝에 `WHERE` 절을 추가하여 쿼리를 테스트합니다.
  + 쿼리 포맷이 `"SELECT col1 FROM table1 WHERE col2=val"`이면 `AND`로 `WHERE` 절을 확장하고 필터 조건자를 사용하는 표현식을 사용하여 쿼리를 테스트합니다.
+ `dataTypeMapping` – 딕셔너리, 선택 사항, **JDBC** 데이터 유형에서 **Glue** 데이터 유형으로의 매핑을 구축하는 사용자 정의 데이터 유형 매핑입니다. 예를 들어 `"dataTypeMapping":{"FLOAT":"STRING"}` 옵션은 드라이버의 `ResultSet.getString()` 메서드를 호출하여 JDBC 유형 `FLOAT`의 데이터 필드를 Java `String` 유형으로 매핑하고 이를 AWS Glue 레코드를 구축하는 데 사용합니다. `ResultSet` 객체는 각 드라이버에 의해 구현되므로 동작은 사용하는 드라이버에 따라 다릅니다. 드라이버가 변환을 수행하는 방법을 이해하려면 JDBC 드라이버에 대한 설명서를 참조하세요.
+ 현재 지원되는 AWS Glue 데이터 유형은 다음과 같습니다.
  + DATE
  + STRING
  + TIMESTAMP
  + INT
  + FLOAT
  + LONG
  + BIGDECIMAL
  + BYTE
  + SHORT
  + DOUBLE

   지원되는 JDBC 데이터 유형은 [Java8 java.sql.types](https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html)입니다.

  기본 데이터 유형 매핑(JDBC에서 AWS Glue로)은 다음과 같습니다.
  +  DATE -> DATE
  +  VARCHAR -> STRING
  +  CHAR -> STRING
  +  LONGNVARCHAR -> STRING
  +  TIMESTAMP -> TIMESTAMP
  +  INTEGER -> INT
  +  FLOAT -> FLOAT
  +  REAL -> FLOAT
  +  BIT -> BOOLEAN
  +  BOOLEAN -> BOOLEAN
  +  BIGINT -> LONG
  +  DECIMAL -> BIGDECIMAL
  +  NUMERIC -> BIGDECIMAL
  +  TINYINT -> SHORT
  +  SMALLINT -> SHORT
  +  DOUBLE -> DOUBLE

  옵션 `dataTypeMapping`과 함께 사용자 정의 데이터 유형 매핑을 사용하는 경우 기본 데이터 유형 매핑을 재정의할 수 있습니다. `dataTypeMapping` 옵션에 나열된 JDBC 데이터 유형만 영향을 받습니다. 기본 매핑은 다른 모든 JDBC 데이터 유형에 사용됩니다. 필요한 경우 추가 JDBC 데이터 유형에 대한 매핑을 추가할 수 있습니다. JDBC 데이터 유형이 기본 매핑이나 사용자 지정 매핑에 포함되지 않은 경우 데이터 유형은 기본값으로 AWS Glue `STRING` 데이터 유형으로 변환됩니다.

다음 Python 코드 예제는 AWS Marketplace JDBC 드라이버를 사용하여 JDBC 데이터베이스에서 읽는 방법을 보여줍니다. 데이터베이스에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.jdbc", connection_options = 
     {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, 
       name, department from department where id < 200","numPartitions":"4",
       "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"},
        transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},
      "upperBound":"200","query":"select id, name, department from department where 
       id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0",
       "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### custom.athena 또는 marketplace.athena 유형에 대한 연결 옵션
<a name="marketplace-athena-connect-options"></a>
+ `className` - 문자열, 필수, 드라이버 클래스 이름입니다. Athena-CloudWatch 커넥터를 사용하는 경우 이 파라미터 값은 클래스 이름의 접두사(예: `"com.amazonaws.athena.connectors"`)입니다. Athena-CloudWatch 커넥터는 메타데이터 핸들러와 레코드 핸들러의 두 가지 클래스로 구성됩니다. 여기에 공통 접두사를 제공하면 API가 해당 접두사를 기반으로 올바른 클래스를 로드합니다.
+ `tableName` - 문자열, 필수, 읽을 CloudWatch 로그 스트림의 이름입니다. 이 코드 조각은 특수 보기 이름 `all_log_streams`을 사용합니다. 즉, 반환된 동적 데이터 프레임에는 로그 그룹의 모든 로그 스트림 데이터가 포함됩니다.
+ `schemaName` - 문자열, 필수, 읽을 CloudWatch 로그 그룹의 이름입니다. 예를 들어 `/aws-glue/jobs/output`입니다.
+ `connectionName` - 문자열, 필수, 커넥터와 연결된 연결 이름입니다.

이 커넥터에 대한 추가 옵션은 GitHub의 [Amazon Athena CloudWatch 커넥터 README](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-cloudwatch)를 참조하세요.

다음 Python 코드 예제는 AWS Marketplace 커넥터를 사용하여 Athena 데이터 스토어에서 읽는 방법을 보여줍니다. Athena에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.athena", connection_options = 
     {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output",
      "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.athena", connection_options = {"tableName":"all_log_streams",,
      "schemaName":"/aws-glue/jobs/output","connectionName":
      "test-connection-athena"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### custom.spark 또는 marketplace.spark 유형에 대한 연결 옵션
<a name="marketplace-spark-connect-options"></a>
+ `className` - 문자열, 필수, 커넥터 클래스 이름입니다.
+ `secretId` - 문자열, 선택 사항, 커넥터 연결에 대한 자격 증명을 검색하는 데 사용됩니다.
+ `connectionName` - 문자열, 필수, 커넥터와 연결된 연결 이름입니다.
+ 다른 옵션은 데이터 스토어에 따라 다릅니다. 예를 들어 OpenSearch 구성 옵션은 [Apache Hadoop용 Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html) 설명서에 설명된 대로 접두사 `es`로 시작합니다. Snowflake에 대한 Spark 연결은 *Connecting to Snowflake* 가이드의 [Using the Spark Connector](https://docs.snowflake.com/en/user-guide/spark-connector-use.html)에 설명된 대로 `sfUser` 및 `sfPassword`와 같은 옵션을 사용합니다.

다음 Python 코드 예제는 `marketplace.spark` 연결을 사용하여 OpenSearch 데이터 스토어에서 읽는 방법을 보여줍니다.

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test",
      "es.nodes.wan.only":"true","es.nodes":"https://{{<AWS endpoint>}}",
      "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only":
      "true","es.nodes":"https://{{<AWS endpoint>}}","connectionName":
      "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
         "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = DataSource0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, 
       connection_type = "s3", format = "json", connection_options = {"path": 
       "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

## 일반 옵션
<a name="aws-glue-programming-etl-connect-general-options"></a>

이 섹션의 옵션은 `connection_options`로 제공되지만 특별히 하나의 커넥터에만 적용되지는 않습니다.

다음 파라미터는 보통 북마크를 구성할 때 사용됩니다. Amazon S3 또는 JDBC 워크플로에 적용될 수도 있습니다. 자세한 내용은 [작업 북마크 사용](programming-etl-connect-bookmarks.md) 섹션을 참조하세요.
+ `jobBookmarkKeys` - 열 이름의 배열입니다.
+ `jobBookmarkKeysSortOrder` - 정렬 순서에 따라 값을 비교하는 방법을 정의하는 문자열입니다. 유효한 값: `"asc"`, `"desc"`.
+ `useS3ListImplementation` - Amazon S3 버킷 콘텐츠를 나열할 때 메모리 성능을 관리하는 데 사용됩니다. 자세한 내용은 [Optimize memory management in AWS Glue](https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/)를 참조하세요.