

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Glue for Spark 中 ETL 的連線類型和選項
<a name="aws-glue-programming-etl-connect"></a>

在 AWS Glue for Spark 中，可採用多種 PySpark 和 Scala 方法及轉換使用 `connectionType` 參數指定連線類型。這些使用 `connectionOptions` 或 `options` 參數指定連線選項。

`connectionType` 參數可以接受如下表所示的值。以下各節將說明每種類型的相關 `connectionOptions` (或 `options`) 參數值。除非另有說明，否則當連線做為來源或接收器使用時，參數即會套用。

如需有關示範設定和使用連線選項的範例程式碼，請參閱每個連線類型的首頁。


| `connectionType` | 連線到 | 
| --- | --- | 
| [dynamodb](aws-glue-programming-etl-connect-dynamodb-home.md) | [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/) 資料庫 | 
| [kinesis](aws-glue-programming-etl-connect-kinesis-home.md) | [Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) | 
| [s3](aws-glue-programming-etl-connect-s3-home.md) | [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/dev/) | 
| [documentdb](aws-glue-programming-etl-connect-documentdb-home.md#aws-glue-programming-etl-connect-documentdb) | [Amazon DocumentDB (with MongoDB compatibility)](https://docs.aws.amazon.com/documentdb/latest/developerguide/) 資料庫 | 
| [opensearch](aws-glue-programming-etl-connect-opensearch-home.md) | [Amazon OpenSearch Service](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/)。 | 
| [redshift](aws-glue-programming-etl-connect-redshift-home.md) | [Amazon Redshift](https://aws.amazon.com/redshift/) 資料庫 | 
| [kafka](aws-glue-programming-etl-connect-kafka-home.md) |  [Kafka](https://kafka.apache.org/) 或 [Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) | 
| [azurecosmos](aws-glue-programming-etl-connect-azurecosmos-home.md) | Azure Cosmos for NoSQL。 | 
| [azuresql](aws-glue-programming-etl-connect-azuresql-home.md) | Azure SQL。 | 
| [bigquery](aws-glue-programming-etl-connect-bigquery-home.md) | Google BigQuery。 | 
| [mongodb](aws-glue-programming-etl-connect-mongodb-home.md) | [MongoDB](https://www.mongodb.com/what-is-mongodb) 資料庫 (包含 MongoDB Atlas)。 | 
| [sqlserver](aws-glue-programming-etl-connect-jdbc-home.md) |  Microsoft SQL 伺服器資料庫 (請參閱 [JDBC 連線](aws-glue-programming-etl-connect-jdbc-home.md)) | 
| [mysql](aws-glue-programming-etl-connect-jdbc-home.md) | [MySQL](https://www.mysql.com/) 資料庫 (請參閱 [JDBC 連線](aws-glue-programming-etl-connect-jdbc-home.md)) | 
| [oracle](aws-glue-programming-etl-connect-jdbc-home.md) | [Oracle](https://www.oracle.com/database/) 資料庫 (請參閱 [JDBC 連線](aws-glue-programming-etl-connect-jdbc-home.md)) | 
| [postgresql](aws-glue-programming-etl-connect-jdbc-home.md) |  [PostgreSQL](https://www.postgresql.org/) 資料庫 (請參閱 [JDBC 連線](aws-glue-programming-etl-connect-jdbc-home.md)) | 
| [saphana](aws-glue-programming-etl-connect-saphana-home.md) | SAP HANA。 | 
| [Snowflake](aws-glue-programming-etl-connect-snowflake-home.md) | [Snowflake](https://www.snowflake.com/) 資料湖 | 
| [teradata](aws-glue-programming-etl-connect-teradata-home.md) | Teradata Vantage。 | 
| [vertica](aws-glue-programming-etl-connect-vertica-home.md) | Vertica。 | 
| [custom.\*](#aws-glue-programming-etl-connect-market) | Spark、Athena 或 JDBC 資料存放區 (請參閱 [自訂 and AWS Marketplace connectionType 值](#aws-glue-programming-etl-connect-market)  | 
| [marketplace.\*](#aws-glue-programming-etl-connect-market) | Spark、Athena 或 JDBC 資料存放區 (請參閱 [自訂 and AWS Marketplace connectionType 值](#aws-glue-programming-etl-connect-market))  | 

## Glue 5.0 for Spark 中 ETL AWS 的 DataFrame 選項
<a name="aws-glue-programming-etl-connect-dataframe"></a>

DataFrame 是一種類似於資料表的、以具名資料欄組織的資料集，支援功能樣式 (對應/減少/篩選/等) 操作和 SQL 操作 (選取、專案、彙總)。

若要為 Glue 支援的資料來源建立 DataFrame，需要下列項目：
+ 資料來源連接器 `ClassName`
+ 資料來源連線 `Options`

同樣地，若要將 DataFrame 寫入 Glue 支援的資料接收器，也需要相同的資料：
+ 資料接收器連接器 `ClassName`
+ 資料接收器連線 `Options`

請注意，DataFrame `connectionName` 不支援 Glue AWS 功能，例如任務書籤和 DynamicFrame 選項，例如 。如需 DataFrame 和支援操作的詳細資訊，請參閱 [DataFrame](https://spark.apache.org/docs/3.5.2/api/python/reference/pyspark.sql/dataframe.html) 的 Spark 文件。

### 指定連接器 ClassName
<a name="aws-glue-programming-etl-connect-dataframe-classname"></a>

若要指定資料來源/接收器的 `ClassName`，請使用 `.format` 選項來提供定義資料來源/接收器的對應連接器 `ClassName`。

**JDBC 連接器**  
對於 JDBC 連接器，將 `jdbc` 指定為 `.format` 選項的值，並在 `driver` 選項中提供 JDBC 驅動程式 `ClassName`。

```
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")...

df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
```

下表列出 Glue for DataFrames `ClassName` 中支援資料來源的 JDBC AWS 驅動程式。

| 資料來源 | 驅動程式類別名稱 | 
| --- |--- |
| PostgreSQL | org.postgresql.Driver | 
| Oracle | oracle.jdbc.driver.OracleDriver | 
| SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
| MySQL | com.mysql.jdbc.Driver | 
| SAPHana | com.sap.db.jdbc.Driver | 
| Teradata | com.teradata.jdbc.TeraDriver | 

**Spark 連接器**  
對於 Spark 連接器，將連接器的 `ClassName` 指定為 `.format` 選項的值。

```
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")...

df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
```

下表列出 Glue for DataFrames AWS 中支援資料來源`ClassName`的 Spark 連接器。

| 資料來源 | ClassName | 
| --- |--- |
| MongoDB/DocumentDB | glue.spark.mongodb | 
| Redshift | io.github.spark\_redshift\_community.spark.redshift | 
| AzureCosmos | cosmos.oltp | 
| AzureSQL | com.microsoft.sqlserver.jdbc.spark | 
| BigQuery | com.google.cloud.spark.bigquery | 
| OpenSearch | org.opensearch.spark.sql | 
| Snowflake | net.snowflake.spark.snowflake | 
| Vertica | com.vertica.spark.datasource.VerticaSource | 

### 指定連線選項
<a name="aws-glue-programming-etl-connect-dataframe-connection-options"></a>

若要指定資料來源/接收器連線的 `Options`，請使用 `.option(<KEY>, <VALUE>)` 提供個別選項或 `.options(<MAP>)`，以提供多個選項作為金鑰值映射。

每個資料來源/接收器都支援自己的一組連線 `Options`。如需可用 `Options` 的詳細資訊，請參閱下表所列特定資料來源/接收器 Spark 連接器的公有文件。
+ [JDBC](https://spark.apache.org/docs/3.5.2/sql-data-sources-jdbc.html)
+ [MongoDB/DocumentDB](https://www.mongodb.com/docs/spark-connector/v10.4/)
+ [Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/spark-redshift-connector.html)
+ [AzureCosmos](https://github.com/Azure/azure-cosmosdb-spark)
+ [AzureSQL](https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16)
+ [BigQuery](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example)
+ [OpenSearch](https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#apache-spark)
+ [Snowflake](https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector)
+ [Vertica](https://github.com/vertica/spark-connector)

### 範例
<a name="aws-glue-programming-etl-connect-dataframe-examples"></a>

下列範例是從 PostgreSQL 讀取並寫入 SnowFlake：

**Python**  
範例：

```
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

dataSourceClassName = "jdbc"
dataSourceOptions = {
  "driver": "org.postgresql.Driver",
  "url": "<url>",
  "user": "<user>",
  "password": "<password>",
  "dbtable": "<dbtable>",
}

dataframe = spark.read.format(className).options(**options).load()

dataSinkClassName = "net.snowflake.spark.snowflake"
dataSinkOptions = {
  "sfUrl": "<url>",
  "sfUsername": "<username>",
  "sfPassword": "<password>",
  "sfDatabase" -> "<database>",                              
  "sfSchema" -> "<schema>",                       
  "sfWarehouse" -> "<warehouse>"  
}

dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
```

**Scala**  
範例：

```
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

val dataSourceClassName = "jdbc"
val dataSourceOptions = Map(
  "driver" -> "org.postgresql.Driver",
  "url" -> "<url>",
  "user" -> "<user>",
  "password" -> "<password>",
  "dbtable" -> "<dbtable>"
)

val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load()

val dataSinkClassName = "net.snowflake.spark.snowflake"
val dataSinkOptions = Map(
  "sfUrl" -> "<url>",
  "sfUsername" -> "<username>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database>",
  "sfSchema" -> "<schema>",
  "sfWarehouse" -> "<warehouse>"
)

dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
```

## 自訂 and AWS Marketplace connectionType 值
<a name="aws-glue-programming-etl-connect-market"></a>

這些索引標籤包括以下項目：
+ `"connectionType": "marketplace.athena"`：指定連至 Amazon Athena 資料存放區的連線。連線使用來自 的連接器 AWS Marketplace。
+ `"connectionType": "marketplace.spark"`：指定連至 Apache Spark 資料存放區的連線。連線使用來自 的連接器 AWS Marketplace。
+ `"connectionType": "marketplace.jdbc"`：指定連至 JDBC 資料存放區的連線。連線使用來自 的連接器 AWS Marketplace。
+ `"connectionType": "custom.athena"`：指定連至 Amazon Athena 資料存放區的連線。連線會使用您上傳至 AWS Glue Studio 的自訂連接器。
+ `"connectionType": "custom.spark"`：指定連至 Apache Spark 資料存放區的連線。連線會使用您上傳至 AWS Glue Studio 的自訂連接器。
+ `"connectionType": "custom.jdbc"`：指定連至 JDBC 資料存放區的連線。連線會使用您上傳至 AWS Glue Studio 的自訂連接器。

### 類型 custom.jdbc 或 marketplace.jdbc 的連線選項
<a name="marketplace-jdbc-connect-options"></a>
+ `className` – 字串，必要，JDBC 驅動程式的完整 Java 類別名稱 （例如 `com.mysql.cj.jdbc.Driver`)。使用自訂或 AWS Marketplace JDBC 連接器時，您必須指定此選項。如果您省略此選項，任務可能會失敗並出現`No suitable driver`錯誤，特別是在使用參數化 JDBC URLs時。
+ `connectionName` – 字串、必要，與連接器相關聯之連線的名稱。
+ `url` – 字串、必要，具有預留位置 (`${}`) 的 JDBC URL，用來建立與資料來源的連線。預留位置 `${secretKey}` 會替換為 AWS Secrets Manager中相同名稱的秘密。如需有關建構 URL 的更多資訊，請參閱資料存放區文件。
+ `secretId` 或 `user/password` – 字串、必要，用於擷取 URL 憑證。
+ `dbTable` 或 `query` – 字串、必要，要從中取得資料的資料表或 SQL 查詢。您可以指定 `dbTable` 或 `query`，但不能同時指定兩者。
+ `partitionColumn` - 字串、選用，用於分割的整數欄名稱。此選項僅適用於包含在 `lowerBound`、`upperBound` 以及 `numPartitions` 中。此選項的運作方式與 Spark SQL JDBC 讀取器相同。如需詳細資訊，請參閱 *Apache Spark SQL、DataFrames 和資料集指南*中的 [JDBC 至其他資料庫](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)。

  `lowerBound` 和 `upperBound` 值用於決定分割區步幅，而不是用於篩選資料表中的列。資料表中的所有列都會進行分割並傳回。
**注意**  
使用查詢而不是資料表名稱時，您應該驗證查詢是否適用於指定的分割條件。例如：  
如果您的查詢格式為 `"SELECT col1 FROM table1"`，則透過在使用分割區欄的查詢結尾附加 `WHERE` 子句來測試查詢。
如果您的查詢格式為 "`SELECT col1 FROM table1 WHERE col2=val"`，則透過使用 `AND` 和使用分割區欄的表達式擴展 `WHERE` 子句來測試查詢。
+ `lowerBound` - 整數、選用，用來決定分割區步幅的 `partitionColumn` 最小值。
+ `upperBound` - 整數、選用，用來決定分割區步幅的 `partitionColumn` 最大值。
+ `numPartitions` - 整數、選用，分割區數目。這個值，搭配 `lowerBound` (包含) 及 `upperBound` (不含)，形成用於分割 `partitionColumn` 而產生之 `WHERE` 子句表達式的分割區步幅。
**重要**  
請小心分割區的數目，因為太多的分割區可能會造成外部資料庫系統的問題。
+ `filterPredicate` - 字串、選用，額外條件子句，用於篩選來源的資料。例如：

  ```
  BillingCity='Mountain View'
  ```

  當您使用*查詢*，而不是*資料表*名稱，您應該驗證查詢是否適用於指定的 `filterPredicate`。例如：
  + 如果您的查詢格式為 `"SELECT col1 FROM table1"`，則透過在使用篩選述詞的查詢結尾附加 `WHERE` 子句來測試查詢。
  + 如果您的查詢格式為 `"SELECT col1 FROM table1 WHERE col2=val"`，則透過使用 `AND` 和使用篩選述詞的表達式擴展 `WHERE` 子句來測試查詢。
+ `dataTypeMapping` - 字典、選用、自訂資料類型映射，用於建構從 **JDBC** 資料類型到 **Glue** 資料類型的映射。例如，選項 `"dataTypeMapping":{"FLOAT":"STRING"}` 會將 JDBC 類型 `FLOAT` 的資料欄位映射至 Java `String` 類型中，方法是呼叫驅動程式的 `ResultSet.getString()` 方法，並使用它來建構 AWS Glue 記錄。`ResultSet` 物件是由每個驅動程式實作，因此行為是特定於您使用的驅動程式。請參閱 JDBC 驅動程式的文件，瞭解驅動程式如何執行轉換。
+ 目前支援的 AWS Glue 資料類型為：
  + DATE
  + STRING
  + TIMESTAMP
  + INT
  + FLOAT
  + LONG
  + BIGDECIMAL
  + BYTE
  + SHORT
  + DOUBLE

   支援的 JDBC 資料類型為 [Java8 java.sql.types](https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html)。

  預設資料類型映射 (從 JDBC 到 AWS Glue) 是：
  +  DATE -> DATE
  +  VARCHAR -> STRING
  +  CHAR -> STRING
  +  LONGNVARCHAR -> STRING
  +  TIMESTAMP -> TIMESTAMP
  +  INTEGER -> INT
  +  FLOAT -> FLOAT
  +  REAL -> FLOAT
  +  BIT -> BOOLEAN
  +  BOOLEAN -> BOOLEAN
  +  BIGINT -> LONG
  +  DECIMAL -> BIGDECIMAL
  +  NUMERIC -> BIGDECIMAL
  +  TINYINT -> SHORT
  +  SMALLINT -> SHORT
  +  DOUBLE -> DOUBLE

  如果您使用自訂資料類型映射搭配選項 `dataTypeMapping`，那麼您可以覆寫預設的資料類型映射。只有 `dataTypeMapping` 選項中的 JDBC 資料類型會受到影響；預設映射會用於所有其他 JDBC 資料類型。如果需要，您可以為其他 JDBC 資料類型新增映射。如果 JDBC 資料類型未包含在預設映射或自訂映射中，則資料類型預設會轉換為 AWS Glue `STRING` 資料類型。

下列 Python 程式碼範例示範如何使用 JDBC 驅動程式從 AWS Marketplace JDBC 資料庫讀取。它演示了從資料庫讀取和寫入 S3 位置。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.jdbc", connection_options = 
     {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, 
       name, department from department where id < 200","numPartitions":"4",
       "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"},
        transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},
      "upperBound":"200","query":"select id, name, department from department where 
       id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0",
       "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### 類型 custom.athena 或 marketplace.athena 的連線選項
<a name="marketplace-athena-connect-options"></a>
+ `className` – 字串、必要，驅動程式類別名稱。當您使用 Athena-CloudWatch 連接器時，此參數值就是類別名稱的字首 (例如 `"com.amazonaws.athena.connectors"`)。Athena-CloudWatch 連接器由兩個類別組成：中繼資料處理常式和記錄處理常式。如果您在此處提供通用字首，則 API 會根據該字首載入正確的類別。
+ `tableName` – 字串、必要，要讀取的 CloudWatch 日誌串流名稱。此程式碼片段使用特殊檢視名稱 `all_log_streams`，這表示傳回的動態資料框架將包含來自日誌群組中所有日誌資料串流的資料。
+ `schemaName` – 字串、必要，要讀取的 CloudWatch 日誌群組。例如 `/aws-glue/jobs/output`。
+ `connectionName` – 字串、必要，與連接器相關聯之連線的名稱。

如需此連接器的其他選項，請參閱 [Amazon Athena CloudWatch 連接器讀我](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-cloudwatch)檔案。

下列 Python 程式碼範例示範如何使用 AWS Marketplace 連接器從 Athena 資料存放區讀取。它展示了從 Athena 讀取和寫入 S3 位置。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.athena", connection_options = 
     {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output",
      "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.athena", connection_options = {"tableName":"all_log_streams",,
      "schemaName":"/aws-glue/jobs/output","connectionName":
      "test-connection-athena"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### 類型 custom.spark 或 marketplace.spark 的連接選項
<a name="marketplace-spark-connect-options"></a>
+ `className` – 字串、必要，連接器類別名稱。
+ `secretId` – 字串、選用，用來擷取連接器連線的憑證。
+ `connectionName` – 字串、必要，與連接器相關聯之連線的名稱。
+ 其他選項視資料存放區而定。例如，OpenSearch 組態選項以字首 `es` 開頭，如 [Elasticsearch for Apache Hadoop](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html) 文件中所述。Spark 連接到 Snowflake 使用選項，例如 `sfUser` 和 `sfPassword`，如*連接到 Snowflake* 指南中的[使用 Spark Connector](https://docs.snowflake.com/en/user-guide/spark-connector-use.html) 所述。

下列 Python 程式碼範例示範如何使用 `marketplace.spark` 連線從 OpenSearch 資料存放區讀取。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test",
      "es.nodes.wan.only":"true","es.nodes":"https://{{<AWS endpoint>}}",
      "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only":
      "true","es.nodes":"https://{{<AWS endpoint>}}","connectionName":
      "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
         "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = DataSource0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, 
       connection_type = "s3", format = "json", connection_options = {"path": 
       "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

## 一般選項
<a name="aws-glue-programming-etl-connect-general-options"></a>

本節中的選項作為 `connection_options` 提供，但並不特別套用於一個連接器。

設定書籤時，通常會使用下列參數。它們可能適用於 Amazon S3 或 JDBC 工作流程。如需詳細資訊，請參閱[使用任務書籤](programming-etl-connect-bookmarks.md)。
+ `jobBookmarkKeys`：資料欄名稱陣列。
+ `jobBookmarkKeysSortOrder`：定義如何根據排序順序比較值的字串。有效值：`"asc"`、`"desc"`。
+ `useS3ListImplementation`：用於在列出 Amazon S3 儲存貯體內容時管理記憶體效能。如需詳細資訊，請參閱[最佳化 Glue AWS 中的記憶體管理](https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/)。