

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Glue Scala GlueContext API
<a name="glue-etl-scala-apis-glue-gluecontext"></a>

**Package: com.amazonaws.services.glue**

```
class GlueContext extends SQLContext(sc) (
           @transient val sc : SparkContext,
           val defaultSourcePartitioner : PartitioningStrategy )
```

`GlueContext` 是讀取和寫入 [DynamicFrame](glue-etl-scala-apis-glue-dynamicframe.md) 至 Amazon Simple Storage Service (Amazon S3)、 AWS Glue Data Catalog 、JDBC 等的進入點。此類別提供公用程式函數來建立 [DataSource 特徵](glue-etl-scala-apis-glue-datasource-trait.md) 和 [DataSink](glue-etl-scala-apis-glue-datasink-class.md) 物件，從而用於讀取和寫入 `DynamicFrame`。

如果從來源建立的分割區數低於分割區的閾值下限 (預設 10)，您也可以使用 `GlueContext` 來設定在 `DynamicFrame` 中的分割區目標數 (預設 20)。

## def addIngestionTimeColumns
<a name="glue-etl-scala-apis-glue-gluecontext-defs-addIngestionTimeColumns"></a>

```
def addIngestionTimeColumns(
         df : DataFrame, 
         timeGranularity : String = "") : dataFrame
```

附加擷取時間欄 (如 `ingest_year`、`ingest_month`、`ingest_day`、`ingest_hour`、`ingest_minute`) 到輸入 `DataFrame`。當您指定以 Amazon S3 為目標的 Data Catalog 資料表時，此函數會在 AWS Glue 產生的指令碼中自動產生。此函數會自動使用輸出資料表上的擷取時間欄來更新分割區。這可讓輸出資料在擷取時間自動分割，而不需要輸入資料中的明確擷取時間欄。
+ `dataFrame` – 要將擷取時間欄附加到的 `dataFrame`。
+ `timeGranularity` – 時間欄的精密程度。有效值為 "`day`"、"`hour`" 和 "`minute`"。例如：如果 "`hour`" 被傳遞給函數，原始 `dataFrame` 會附加上 "`ingest_year`"、"`ingest_month`"、"`ingest_day`" 和 "`ingest_hour`" 時間欄。

傳回附加時間粒度欄後的資料框架。

範例：

```
glueContext.addIngestionTimeColumns(dataFrame, "hour")
```

## def createDataFrameFromOptions
<a name="glue-etl-scala-apis-glue-gluecontext-defs-createDataFrameFromOptions"></a>

```
def createDataFrameFromOptions( connectionType : String,
                         connectionOptions : JsonOptions,
                         transformationContext : String = "",
                         format : String = null,
                         formatOptions : JsonOptions = JsonOptions.empty
                       ) : DataSource
```

傳回使用指定的連線和格式建立的 `DataFrame`。此函數僅適用於 Glue AWS 串流來源。
+ `connectionType` – 串流連線類型。有效值包括 `kinesis` 與 `kafka`。
+ `connectionOptions` – 連線選項，這些選項對於 Kinesis 和 Kafka 而言是不同的。您可以在 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md) 中找到每個串流資料來源的所有連線選項清單。請注意串流連線選項的下列不同處：
  + Kinesis 串流來源需要 `streamARN`、`startingPosition`、`inferSchema` 以及 `classification`。
  + Kafka 串流來源需要 `connectionName`、`topicName`、`startingOffsets`、`inferSchema` 以及 `classification`。
+ `transformationContext` – 要使用的轉換細節 (選用)。
+ `format` – 格式化規格 (選用)。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需有關支援格式的資訊，請參閱 [AWS Glue for Spark 中的輸入與輸出的資料格式選項](aws-glue-programming-etl-format.md)
+ `formatOptions` – 指定格式的格式選項。如需支援格式選項的詳細資訊，請參閱 [資料格式選項](aws-glue-programming-etl-format.md)。

Amazon Kinesis 串流來源範例：

```
val data_frame_datasource0 = 
glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", 
connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
```

Kafka 串流來源範例：

```
val data_frame_datasource0 = 
glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", 
connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
```

## forEachBatch
<a name="glue-etl-scala-apis-glue-gluecontext-defs-forEachBatch"></a>

**`forEachBatch(frame, batch_function, options)`**

將傳入的 `batch_function` 套用至從串流來源讀取的每個微批次。
+ `frame` – 包含目前微批次的 DataFrame。
+ `batch_function` – 將套用至每個微批次的函數。
+ `options` – 索引鍵/值配對的集合，其中包含如何處理微批次的相關資訊。下列選項是必要的：
  + `windowSize` – 處理每個批次的時間量。
  + `checkpointLocation` - 串流 ETL 任務的檢查點儲存位置。
  + `batchMaxRetries` – 如果失敗，可重試批次的次數上限。預設值為 3。此選項僅在 Glue 2.0 及以上版本上才可設定。

**範例**：

```
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => 
   {
      if (dataFrame.count() > 0) 
        {
          val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext)
          // @type: DataSink
          // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", 
          //      stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", 
          //      transformation_ctx = "datasink1"]
          // @return: datasink1
          // @inputs: [frame = datasource0]
          val options_datasink1 = JsonOptions(
             Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), 
             "enableUpdateCatalog" -> true))
          val datasink1 = glueContext.getCatalogSink(
             database = "tempdb", 
             tableName = "fromoptionsoutput", 
             redshiftTmpDir = "", 
             transformationContext = "datasink1", 
             additionalOptions = options_datasink1).writeDynamicFrame(datasource0)
        }
   }, JsonOptions("""{"windowSize" : "100 seconds", 
         "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
```

## def getCatalogSink
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSink"></a>

```
def getCatalogSink( database : String,
        tableName : String,
        redshiftTmpDir : String = "",
        transformationContext : String = ""
        additionalOptions: JsonOptions = JsonOptions.empty,
        catalogId: String = null   
) : DataSink
```

建立 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)，以便寫入 Data Catalog 中定義之資料表中指定的位置。
+ `database` —  Data Catalog 中的資料庫名稱。
+ `tableName` —  Data Catalog 中的資料表名稱。
+ `redshiftTmpDir` — 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。
+ `transformationContext` — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
+ `additionalOptions` – 提供給 AWS Glue 的額外選項。
+ `catalogId` — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時，會使用發起人的預設帳戶 ID。

傳回 `DataSink`。

## def getCatalogSource
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSource"></a>

```
def getCatalogSource( database : String,
                      tableName : String,
                      redshiftTmpDir : String = "",
                      transformationContext : String = ""
                      pushDownPredicate : String = " "
                      additionalOptions: JsonOptions = JsonOptions.empty,
                      catalogId: String = null
                    ) : DataSource
```

建立 [DataSource 特徵](glue-etl-scala-apis-glue-datasource-trait.md)，以便從 Data Catalog 中的資料表定義中讀取資料。
+ `database` —  Data Catalog 中的資料庫名稱。
+ `tableName` —  Data Catalog 中的資料表名稱。
+ `redshiftTmpDir` — 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。
+ `transformationContext` — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
+ `pushDownPredicate` – 篩選分割區，而無需列出和讀取資料集中的所有檔案。如需詳細資訊，請參閱[使用 pushdown 述詞預先篩選](aws-glue-programming-etl-partitions.md#aws-glue-programming-etl-partitions-pushdowns)。
+ `additionalOptions` – 選擇性的名稱/值對的集合。可能的選項包括 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md) 中列出的項目，除了 `endpointUrl`、`streamName`、`bootstrap.servers`、`security.protocol`、`topicName`、`classification` 以及`delimiter`。另一個支援的選項是 `catalogPartitionPredicate`：

  `catalogPartitionPredicate` — 您可以傳遞目錄表達式以根據索引欄進行篩選。這會將篩選下推至伺服器端。如需詳細資訊，請參閱 [AWS Glue 分割區索引](https://docs.aws.amazon.com/glue/latest/dg/partition-indexes.html)。注意 `push_down_predicate` 和 `catalogPartitionPredicate` 使用不同的語法。前者使用 Spark SQL 標準語法，後者使用 JSQL 剖析器。
+ `catalogId` — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時，會使用發起人的預設帳戶 ID。

傳回 `DataSource`。

**串流來源範例**

```
val data_frame_datasource0 = glueContext.getCatalogSource(
    database = "tempdb",
    tableName = "test-stream-input", 
    redshiftTmpDir = "", 
    transformationContext = "datasource0", 
    additionalOptions = JsonOptions("""{
        "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""")
    ).getDataFrame()
```

## def getJDBCSink
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getJDBCSink"></a>

```
def getJDBCSink( catalogConnection : String,
                 options : JsonOptions,
                 redshiftTmpDir : String = "",
                 transformationContext : String = "",
                 catalogId: String = null
               ) : DataSink
```

建立 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)，以便寫入 Data Catalog 中 `Connection` 物件所指定的 JDBC 資料庫。此 `Connection` 物件擁有用來對 JDBC 目的地連線的資訊 (包括 URL、使用者名稱、密碼、VPC、子網路和安全群組)。
+ `catalogConnection` —  Data Catalog 中的連線名稱，其中包含要做為寫入目的地之 JDBC URL。
+ `options` — JSON 名稱值組的字串，可提供寫入 JDBC 資料存放區所需的其他資訊。其中包含：
  + *dbtable* (必要) — JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區，請指定 `schema.table-name`。如果未提供結構描述，則會使用預設的 "public" 結構描述。以下範例說明 options 參數，它會指向資料庫 `test_db` 中名為 `test` 的結構描述和名為 `test_table` 的資料表。

    ```
    options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    ```
  + *database* (必要) — JDBC 資料庫的名稱。
  + 任何其他選項都會直接傳遞至 SparkSQL JDBC 寫入器。如需詳細資訊，請參閱 [Spark 的 Redshift 資料來源](https://github.com/databricks/spark-redshift)。
+ `redshiftTmpDir` — 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。
+ `transformationContext` — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
+ `catalogId` — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時，會使用發起人的預設帳戶 ID。

範例程式碼：

```
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
```

傳回 `DataSink`。

## def getSink
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSink"></a>

```
def getSink( connectionType : String,
             connectionOptions : JsonOptions,
             transformationContext : String = ""
           ) : DataSink
```

建立 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)，將資料寫入 Amazon Simple Storage Service (Amazon S3)、JDBC 或 AWS Glue Data Catalog 等目的地，或 Apache Kafka 或 Amazon Kinesis 資料串流。
+ `connectionType` — 連線的類型。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `connectionOptions` — JSON 名稱值組的字串，可提供與資料目的地建立連線的額外資料。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `transformationContext` — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。

傳回 `DataSink`。

## def getSinkWithFormat
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSinkWithFormat"></a>

```
def getSinkWithFormat( connectionType : String,
                       options : JsonOptions,
                       transformationContext : String = "",
                       format : String = null,
                       formatOptions : JsonOptions = JsonOptions.empty
                     ) : DataSink
```

建立 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)，以將資料寫入至目的地，如 Amazon S3、JDBC、Data Catalog、Apache Kafka 或 Amazon Kinesis 資料串流。亦設定要寫出至目的地的資料格式。
+ `connectionType` — 連線的類型。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `options` — JSON 名稱值組的字串，可提供與資料目的地建立連線的額外資料。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `transformationContext` — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
+ `format` — 要從目的地寫出的資料格式。
+ `formatOptions` — JSON 名稱值組的字串，會提供在目的地格式化資料的其他選項。請參閱 [資料格式選項](aws-glue-programming-etl-format.md)。

傳回 `DataSink`。

## def getSource
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSource"></a>

```
def getSource( connectionType : String,
               connectionOptions : JsonOptions,
               transformationContext : String = ""
               pushDownPredicate
             ) : DataSource
```

建立從 Amazon S3、JDBC 或 Glue Data Catalog AWS 等來源[DataSource 特徵](glue-etl-scala-apis-glue-datasource-trait.md)讀取資料的 。也支援 Kafka 和 Kinesis 串流資料來源。
+ `connectionType` — 資料來源的類型。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `connectionOptions` — JSON 名稱值組的字串，可提供與資料來源建立連線的額外資料。如需詳細資訊，請參閱[AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。

  Kinesis 串流來源需要下列連線選項：`streamARN`、`startingPosition`、`inferSchema` 及 `classification`。

  Kafka 串流來源需要以下連線選項：`connectionName`、`topicName`、`startingOffsets`、`inferSchema` 及 `classification`。
+ `transformationContext` — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
+ `pushDownPredicate` — 分割區欄上的述詞。

傳回 `DataSource`。

Amazon Kinesis 串流來源範例：

```
val kinesisOptions = jsonOptions()
data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame()

private def jsonOptions(): JsonOptions = {
    new JsonOptions(
      s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream",
         |"startingPosition": "TRIM_HORIZON",
         |"inferSchema": "true",
         |"classification": "json"}""".stripMargin)
}
```

Kafka 串流來源範例：

```
val kafkaOptions = jsonOptions()
val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame()

private def jsonOptions(): JsonOptions = {
    new JsonOptions(
      s"""{"connectionName": "ConfluentKafka",
         |"topicName": "kafka-auth-topic",
         |"startingOffsets": "earliest",
         |"inferSchema": "true",
         |"classification": "json"}""".stripMargin)
 }
```

## def getSourceWithFormat
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSourceWithFormat"></a>

```
def getSourceWithFormat( connectionType : String,
                         options : JsonOptions,
                         transformationContext : String = "",
                         format : String = null,
                         formatOptions : JsonOptions = JsonOptions.empty
                       ) : DataSource
```

建立從 Amazon S3、JDBC 或 AWS Glue Data Catalog 等來源[DataSource 特徵](glue-etl-scala-apis-glue-datasource-trait.md)讀取資料的 ，也會設定存放在來源中的資料格式。
+ `connectionType` – 資料來源的類型。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `options` – JSON 名稱值組的字串，可提供與資料來源建立連線的額外資料。請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。
+ `transformationContext` – 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
+ `format` – 來源中所存放資料的格式。當 `connectionType` 為「s3」時，您也可以指定 `format`。可以是「avro」、「csv」、「grokLog」、「ion」、「json」、「xml」、「parquet」或「orc」其中之一。
+ `formatOptions` – JSON 名稱值組的字串，會提供在來源剖析資料的其他選項。請參閱 [資料格式選項](aws-glue-programming-etl-format.md)。

傳回 `DataSource`。

**範例**

從 Amazon S3 上逗號分隔值 (CSV) 檔案的資料來源建立 DynamicFrame：

```
val datasource0 = glueContext.getSourceWithFormat(
    connectionType="s3",
    options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""),
    transformationContext = "datasource0", 
    format = "csv",
    formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""")
    ).getDynamicFrame()
```

從使用 JDBC 連線的 PostgreSQL 資料來源建立 DynamicFrame：

```
val datasource0 = glueContext.getSourceWithFormat(
    connectionType="postgresql",
    options =JsonOptions(s"""{
      "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb",
      "dbtable": "public.company",
      "redshiftTmpDir":"", 
      "user":"username", 
      "password":"password123"
    }"""),
    transformationContext = "datasource0").getDynamicFrame()
```

從使用 JDBC 連線的 MySQL 資料來源建立 DynamicFrame：

```
 val datasource0 = glueContext.getSourceWithFormat(
    connectionType="mysql",
    options =JsonOptions(s"""{
      "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb",
      "dbtable": "athenatest_nycflights13_csv",
      "redshiftTmpDir":"", 
      "user":"username", 
      "password":"password123"
    }"""),
    transformationContext = "datasource0").getDynamicFrame()
```

## def getSparkSession
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSparkSession"></a>

```
def getSparkSession : SparkSession 
```

取得與此 GlueContext 相關聯的 `SparkSession` 物件。使用此 SparkSession 物件以將資料表與 UDF 註冊為與從 DynamicFrames 建立的 `DataFrame` 搭配使用。

傳回 SparkSession。

## def startTransaction
<a name="glue-etl-scala-apis-glue-gluecontext-defs-start-transaction"></a>

```
def startTransaction(readOnly: Boolean):String
```

開始新交易。內部呼叫 Lake Formation [startTransaction](https://docs.aws.amazon.com/lake-formation/latest/dg/aws-lake-formation-api-aws-lake-formation-api-transactions.html#aws-lake-formation-api-aws-lake-formation-api-transactions-StartTransaction) API。
+ `readOnly` – (布林值) 指出此交易應該是唯讀，還是讀取和寫入。使用唯讀交易 ID 進行的寫入將被拒絕。唯讀交易不需要遞交。

傳回交易 ID。

## def commitTransaction
<a name="glue-etl-scala-apis-glue-gluecontext-defs-commit-transaction"></a>

```
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
```

嘗試遞交指定的交易。`commitTransaction` 可能會在交易完成遞交之前返回。內部呼叫 Lake Formation [commitTransaction](https://docs.aws.amazon.com/lake-formation/latest/dg/aws-lake-formation-api-aws-lake-formation-api-transactions.html#aws-lake-formation-api-aws-lake-formation-api-transactions-CommitTransaction) API。
+ `transactionId` – (字串) 要遞交的交易。
+ `waitForCommit` – (布林值) 決定 `commitTransaction` 是否立即傳回。預設值為 true。如為 False，`commitTransaction` 輪詢並等待，直到交易完成遞交。使用指數退避時，等待時間長度限制為 1 分鐘，最多可嘗試 6 次重試。

傳回一個布林值，指示遞交是否完成。

## def cancelTransaction
<a name="glue-etl-scala-apis-glue-gluecontext-defs-cancel-transaction"></a>

```
def cancelTransaction(transactionId: String): Unit
```

嘗試取消指定的交易。內部呼叫 Lake Formation [CancelTransaction](https://docs.aws.amazon.com/lake-formation/latest/dg/aws-lake-formation-api-aws-lake-formation-api-transactions.html#aws-lake-formation-api-aws-lake-formation-api-transactions-CancelTransaction) API。
+ `transactionId` – (字串) 要取消的交易。

如果交易先前已遞交，傳回 `TransactionCommittedException` 例外狀況。

## def 此
<a name="glue-etl-scala-apis-glue-gluecontext-defs-this-1"></a>

```
def this( sc : SparkContext,
          minPartitions : Int,
          targetPartitions : Int )
```

使用指定的 `SparkContext`、最小分割區和分割區目標來建立 `GlueContext` 物件。
+ `sc` — `SparkContext`。
+ `minPartitions` — 分割區最小數。
+ `targetPartitions` — 分割區目標數。

傳回 `GlueContext`。

## def 此
<a name="glue-etl-scala-apis-glue-gluecontext-defs-this-2"></a>

```
def this( sc : SparkContext )
```

透過提供的 `SparkContext` 建立 `GlueContext` 物件。將分割區的最小值設為 10，目標分割區設為 20。
+ `sc` — `SparkContext`。

傳回 `GlueContext`。

## def 此
<a name="glue-etl-scala-apis-glue-gluecontext-defs-this-3"></a>

```
def this( sparkContext : JavaSparkContext )
```

透過提供的 `JavaSparkContext` 建立 `GlueContext` 物件。將分割區的最小值設為 10，目標分割區設為 20。
+ `sparkContext` — `JavaSparkContext`。

傳回 `GlueContext`。