

# AWS Glue Scala GlueContext API
<a name="glue-etl-scala-apis-glue-gluecontext"></a>

**程序包：com.amazonaws.services.glue**

```
class GlueContext extends SQLContext(sc) (
           @transient val sc : SparkContext,
           val defaultSourcePartitioner : PartitioningStrategy )
```

`GlueContext` 是在以下位置读取和写入 [DynamicFrame](glue-etl-scala-apis-glue-dynamicframe.md) 的入口点：Amazon Simple Storage Service（Amazon S3）、AWS Glue 数据目录、JDBC 等。此类提供实用程序函数，用于创建可用来读取和写入 `DynamicFrame` 的 [数据源特性](glue-etl-scala-apis-glue-datasource-trait.md) 和 [DataSink](glue-etl-scala-apis-glue-datasink-class.md) 对象。

如果从源创建的分区数小于分区的最小阈值（默认值为 10），您还可以使用 `GlueContext` 设置 `DynamicFrame` 中的目标分区数量（默认值为 20）。

## def addIngestionTimeColumns
<a name="glue-etl-scala-apis-glue-gluecontext-defs-addIngestionTimeColumns"></a>

```
def addIngestionTimeColumns(
         df : DataFrame, 
         timeGranularity : String = "") : dataFrame
```

将提取时间列（例如 `ingest_year`、`ingest_month`、`ingest_day`、`ingest_hour`、`ingest_minute`）附加到输入 `DataFrame`。当您指定以 Amazon S3 为目标的数据目录表时，AWS Glue 生成的脚本中会自动生成此函数。此函数使用输出表上的提取时间列自动更新分区。这允许根据提取时间自动对输出数据进行分区，而不需要在输入数据中显示提取时间列。
+ `dataFrame` – 提取时间列要附加到的 `dataFrame`。
+ `timeGranularity` – 时间列的粒度。有效值为“`day`”、“`hour`”和“`minute`”。例如，如果“`hour`”传递到函数，原始 `dataFrame` 将附加“`ingest_year`”、“`ingest_month`”、“`ingest_day`”和“`ingest_hour`”时间列。

在附加时间粒度列后返回数据框。

示例：

```
glueContext.addIngestionTimeColumns(dataFrame, "hour")
```

## def createDataFrameFromOptions
<a name="glue-etl-scala-apis-glue-gluecontext-defs-createDataFrameFromOptions"></a>

```
def createDataFrameFromOptions( connectionType : String,
                         connectionOptions : JsonOptions,
                         transformationContext : String = "",
                         format : String = null,
                         formatOptions : JsonOptions = JsonOptions.empty
                       ) : DataSource
```

返回一个使用指定连接和格式创建的 `DataFrame`。仅将此函数与 AWS Glue 流式传输源结合使用。
+ `connectionType` – 流式传输连接类型。有效值包括 `kinesis` 和 `kafka`。
+ `connectionOptions` – 连接选项，不同于 Kinesis 和 Kafka。您可以在 [AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md) 找到每个流式传输数据源的所有连接选项列表。请注意流式传输连接选项的以下差异：
  + Kinesis 流式传输源需要 `streamARN`、`startingPosition`、`inferSchema` 和 `classification`。
  + Kinesis 流式传输源需要 `connectionName`、`topicName`、`startingOffsets`、`inferSchema` 和 `classification`。
+ `transformationContext` – 要使用的转换上下文（可选）。
+ `format` – 格式规范（可选）。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关所支持格式的信息，请参阅 [AWS Glue for Spark 中的输入和输出的数据格式选项](aws-glue-programming-etl-format.md)。
+ `formatOptions` – 指定格式的格式选项。有关所支持的格式选项的信息，请参阅[数据格式选项](aws-glue-programming-etl-format.md)。

Amazon Kinesis 流式传输源示例：

```
val data_frame_datasource0 = 
glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", 
connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
```

Kafka 流式传输源示例：

```
val data_frame_datasource0 = 
glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", 
connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
```

## forEachBatch
<a name="glue-etl-scala-apis-glue-gluecontext-defs-forEachBatch"></a>

**`forEachBatch(frame, batch_function, options)`**

将传入的 `batch_function` 应用于从流式传输源读取的每个微批处理。
+ `frame` – 包含当前微处理的 DataFrame。
+ `batch_function` – 应用于每个微处理的函数。
+ `options` – 键值对集合，其中包含有关如何处理微批处理的信息。以下选项为必填：
  + `windowSize` – 处理每个批处理所花费的时间量。
  + `checkpointLocation` – 为流式传输 ETL 任务存储检查点的位置。
  + `batchMaxRetries` – 在该批处理失败时重试的最大次数。默认值为 3。此选项仅适用于 Glue 版本 2.0 及更高版本。

**示例：**

```
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => 
   {
      if (dataFrame.count() > 0) 
        {
          val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext)
          // @type: DataSink
          // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", 
          //      stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", 
          //      transformation_ctx = "datasink1"]
          // @return: datasink1
          // @inputs: [frame = datasource0]
          val options_datasink1 = JsonOptions(
             Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), 
             "enableUpdateCatalog" -> true))
          val datasink1 = glueContext.getCatalogSink(
             database = "tempdb", 
             tableName = "fromoptionsoutput", 
             redshiftTmpDir = "", 
             transformationContext = "datasink1", 
             additionalOptions = options_datasink1).writeDynamicFrame(datasource0)
        }
   }, JsonOptions("""{"windowSize" : "100 seconds", 
         "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
```

## def getCatalogSink
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSink"></a>

```
def getCatalogSink( database : String,
        tableName : String,
        redshiftTmpDir : String = "",
        transformationContext : String = ""
        additionalOptions: JsonOptions = JsonOptions.empty,
        catalogId: String = null   
) : DataSink
```

创建一个可向在数据目录中定义的表中指定的位置写入的 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)。
+ `database` – 数据目录中的数据库名称。
+ `tableName` – 数据目录中的表名称。
+ `redshiftTmpDir` – 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
+ `additionalOptions` 提供给 AWS Glue 的额外选项。
+ `catalogId` – 正在访问的数据目录 ID（账户 ID）。当为空时，将使用调用方的默认账户 ID。

返回 `DataSink`。

## def getCatalogSource
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSource"></a>

```
def getCatalogSource( database : String,
                      tableName : String,
                      redshiftTmpDir : String = "",
                      transformationContext : String = ""
                      pushDownPredicate : String = " "
                      additionalOptions: JsonOptions = JsonOptions.empty,
                      catalogId: String = null
                    ) : DataSource
```

创建可从数据目录中的表定义读取数据的 [数据源特性](glue-etl-scala-apis-glue-datasource-trait.md)。
+ `database` – 数据目录中的数据库名称。
+ `tableName` – 数据目录中的表名称。
+ `redshiftTmpDir` – 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
+ `pushDownPredicate` – 筛选分区，而不必列出并读取数据集中的所有文件。有关更多信息，请参阅 [使用下推谓词进行预筛选](aws-glue-programming-etl-partitions.md#aws-glue-programming-etl-partitions-pushdowns)。
+ `additionalOptions` – 可选名称/值对的集合。可能选项包括 [AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md) 中列出的选项，但 `endpointUrl`、`streamName`、`bootstrap.servers`、`security.protocol`、`topicName`、`classification` 和 `delimiter` 除外。另一个支持的选项是 `catalogPartitionPredicate`：

  `catalogPartitionPredicate` – 要传递目录表达式以根据索引列进行筛选。这样会将筛选下推到服务器端。有关更多信息，请参阅 [AWS Glue 分区数据](https://docs.aws.amazon.com/glue/latest/dg/partition-indexes.html)。请注意，`push_down_predicate` 和 `catalogPartitionPredicate` 使用不同的语法。前者使用 Spark SQL 标准语法，后者使用 JSQL 解析器。
+ `catalogId` – 正在访问的数据目录 ID（账户 ID）。当为空时，将使用调用方的默认账户 ID。

返回 `DataSource`。

**流式传输源示例**

```
val data_frame_datasource0 = glueContext.getCatalogSource(
    database = "tempdb",
    tableName = "test-stream-input", 
    redshiftTmpDir = "", 
    transformationContext = "datasource0", 
    additionalOptions = JsonOptions("""{
        "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""")
    ).getDataFrame()
```

## def getJDBCSink
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getJDBCSink"></a>

```
def getJDBCSink( catalogConnection : String,
                 options : JsonOptions,
                 redshiftTmpDir : String = "",
                 transformationContext : String = "",
                 catalogId: String = null
               ) : DataSink
```

创建一个可向在数据目录中的 `Connection` 对象指定的 JDBC 数据库写入的 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)。`Connection` 对象具有用于连接 JDBC 接收器的信息，包括 URL、用户名、密码、VPC、子网和安全组。
+ `catalogConnection` – 数据目录中包含要写入的 JDBC URL 的连接名称。
+ `options` – JSON 名称-值对的字符串，提供写入 JDBC 数据存储所需的附加信息。这包括：
  + *dbtable*（必填）- JDBC 表名称。对于在数据库中支持架构的 JDBC 数据存储，指定 `schema.table-name`。如果未提供架构，则使用默认的“public”架构。以下示例显示一个指向名为 `test` 的架构的选项参数和数据库 `test_db` 中一个名为 `test_table` 的表。

    ```
    options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    ```
  + *database*（必填）- JDBC 数据库名称。
  + 任何其他选项直接传递到 SparkSQL JDBC 写入器。有关更多信息，请参阅 [Spark 的 Redshift 数据源](https://github.com/databricks/spark-redshift)。
+ `redshiftTmpDir` – 用于某些数据接收器的临时目录。设置为 默认情况下为空。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
+ `catalogId` – 正在访问的数据目录 ID（账户 ID）。当为空时，将使用调用方的默认账户 ID。

示例代码：

```
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
```

返回 `DataSink`。

## def getSink
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSink"></a>

```
def getSink( connectionType : String,
             connectionOptions : JsonOptions,
             transformationContext : String = ""
           ) : DataSink
```

创建 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)，将数据写入 Amazon Simple Storage Service（Amazon S3）、JDBC、AWS Glue 数据目录、Apache Kafka 或 Amazon Kinesis 数据流等目标写入数据。
+ `connectionType` - 连接的类型。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `connectionOptions` – JSON 名称-值对的字符串，提供与数据接收器建立连接所需的附加信息。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

返回 `DataSink`。

## def getSinkWithFormat
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSinkWithFormat"></a>

```
def getSinkWithFormat( connectionType : String,
                       options : JsonOptions,
                       transformationContext : String = "",
                       format : String = null,
                       formatOptions : JsonOptions = JsonOptions.empty
                     ) : DataSink
```

创建 [DataSink](glue-etl-scala-apis-glue-datasink-class.md)，将数据写入 Amazon S3、JDBC、数据目录、Apache Kafka 或 Amazon Kinesis 数据流等目标写入数据。此外还将设置要写出到目标的数据格式。
+ `connectionType` - 连接的类型。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `options` – JSON 名称-值对的字符串，提供与数据接收器建立连接所需的附加信息。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
+ `format` – 要写出到目标的数据的格式。
+ `formatOptions` – JSON 名称-值对的字符串，提供用于设置目标处的数据格式的附加选项。请参阅[数据格式选项](aws-glue-programming-etl-format.md)。

返回 `DataSink`。

## def getSource
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSource"></a>

```
def getSource( connectionType : String,
               connectionOptions : JsonOptions,
               transformationContext : String = ""
               pushDownPredicate
             ) : DataSource
```

创建可从 Amazon S3、JDBC 或 AWS Glue 数据目录等源中读取数据的 [数据源特性](glue-etl-scala-apis-glue-datasource-trait.md)。还支持 Kafka 和 Kinesis 流式传输数据源。
+ `connectionType` – 数据源的类型。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `connectionOptions` – JSON 名称-值对的字符串，提供与数据源建立连接所需的附加信息。有关更多信息，请参阅 [AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。

  Kinesis 流式传输源需要以下连接选项：`streamARN`、`startingPosition`、`inferSchema` 和 `classification`。

  Kinesis 流式传输源需要以下连接选项：`connectionName`、`topicName`、`startingOffsets`、`inferSchema` 和 `classification`。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
+ `pushDownPredicate` – 预测分区列。

返回 `DataSource`。

Amazon Kinesis 流式传输源示例：

```
val kinesisOptions = jsonOptions()
data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame()

private def jsonOptions(): JsonOptions = {
    new JsonOptions(
      s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream",
         |"startingPosition": "TRIM_HORIZON",
         |"inferSchema": "true",
         |"classification": "json"}""".stripMargin)
}
```

Kafka 流式传输源示例：

```
val kafkaOptions = jsonOptions()
val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame()

private def jsonOptions(): JsonOptions = {
    new JsonOptions(
      s"""{"connectionName": "ConfluentKafka",
         |"topicName": "kafka-auth-topic",
         |"startingOffsets": "earliest",
         |"inferSchema": "true",
         |"classification": "json"}""".stripMargin)
 }
```

## def getSourceWithFormat
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSourceWithFormat"></a>

```
def getSourceWithFormat( connectionType : String,
                         options : JsonOptions,
                         transformationContext : String = "",
                         format : String = null,
                         formatOptions : JsonOptions = JsonOptions.empty
                       ) : DataSource
```

创建一个 [数据源特性](glue-etl-scala-apis-glue-datasource-trait.md)，从 Amazon S3, JDBC 或 AWS Glue 数据目录等源读取数据并设置源中所存储数据的格式。
+ `connectionType` – 数据源的类型。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `options` – JSON 名称-值对的字符串，提供与数据源建立连接所需的附加信息。请参阅[AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。
+ `transformationContext` – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
+ `format` – 源中所存储数据的格式。当 `connectionType` 为“s3”时，您也可以指定 `format`。可以是以下值之一：“avro”、“csv”、“grokLog”、“ion”、“json”、“xml”、“parquet”或“orc”。
+ `formatOptions` – JSON 名称-值对的字符串，提供用于在源中分析数据的附加选项。请参阅[数据格式选项](aws-glue-programming-etl-format.md)。

返回 `DataSource`。

**示例**

从 Amazon S3 上逗号分隔值（CSV）文件的数据源创建 DynamicFrame：

```
val datasource0 = glueContext.getSourceWithFormat(
    connectionType="s3",
    options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""),
    transformationContext = "datasource0", 
    format = "csv",
    formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""")
    ).getDynamicFrame()
```

使用 JDBC 连接从作为 PostgreSQL 的数据源创建 DynamicFrame：

```
val datasource0 = glueContext.getSourceWithFormat(
    connectionType="postgresql",
    options =JsonOptions(s"""{
      "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb",
      "dbtable": "public.company",
      "redshiftTmpDir":"", 
      "user":"username", 
      "password":"password123"
    }"""),
    transformationContext = "datasource0").getDynamicFrame()
```

使用 JDBC 连接从作为 MySQL 的数据源创建 DynamicFrame：

```
 val datasource0 = glueContext.getSourceWithFormat(
    connectionType="mysql",
    options =JsonOptions(s"""{
      "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb",
      "dbtable": "athenatest_nycflights13_csv",
      "redshiftTmpDir":"", 
      "user":"username", 
      "password":"password123"
    }"""),
    transformationContext = "datasource0").getDynamicFrame()
```

## def getSparkSession
<a name="glue-etl-scala-apis-glue-gluecontext-defs-getSparkSession"></a>

```
def getSparkSession : SparkSession 
```

获取与此 GlueContext 关联的 `SparkSession` 对象。使用此 SparkSession 对象可注册要用于从 DynamicFrame 创建的 `DataFrame` 的表和 UDF。

返回 SparkSession。

## def startTransaction
<a name="glue-etl-scala-apis-glue-gluecontext-defs-start-transaction"></a>

```
def startTransaction(readOnly: Boolean):String
```

开启新事务。内部调用 Lake Formation [startTransaction](https://docs.aws.amazon.com/lake-formation/latest/dg/aws-lake-formation-api-aws-lake-formation-api-transactions.html#aws-lake-formation-api-aws-lake-formation-api-transactions-StartTransaction) API。
+ `readOnly` –（布尔值）指示此事务应为只读还是读写。使用只读事务 ID 进行的写入将被拒绝。只读事务不需要提交。

返回事务 ID。

## def commitTransaction
<a name="glue-etl-scala-apis-glue-gluecontext-defs-commit-transaction"></a>

```
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
```

尝试提交指定的事务。可能在事务完成提交之前返回 `commitTransaction`。内部调用 Lake Formation [commitTransaction](https://docs.aws.amazon.com/lake-formation/latest/dg/aws-lake-formation-api-aws-lake-formation-api-transactions.html#aws-lake-formation-api-aws-lake-formation-api-transactions-CommitTransaction) API。
+ `transactionId` –（字符串）要提交的事务。
+ `waitForCommit` –（布尔值）确定是否立即返回 `commitTransaction`。默认值为 true。如果为假，则轮询 `commitTransaction` 并等待事务提交。等待时间限制为 1 分钟使用指数回退，最多重试 6 次。

返回布尔值，以指示是否完成提交。

## def cancelTransaction
<a name="glue-etl-scala-apis-glue-gluecontext-defs-cancel-transaction"></a>

```
def cancelTransaction(transactionId: String): Unit
```

尝试取消指定的事务。内部调用 Lake Formation [CancelTransaction](https://docs.aws.amazon.com/lake-formation/latest/dg/aws-lake-formation-api-aws-lake-formation-api-transactions.html#aws-lake-formation-api-aws-lake-formation-api-transactions-CancelTransaction) API。
+ `transactionId` –（字符串）要取消的事务。

如果事务以前已提交，则返回 `TransactionCommittedException` 异常。

## def this
<a name="glue-etl-scala-apis-glue-gluecontext-defs-this-1"></a>

```
def this( sc : SparkContext,
          minPartitions : Int,
          targetPartitions : Int )
```

使用指定的 `SparkContext`、最小分区数和目标分区数创建 `GlueContext` 对象。
+ `sc` — 这些区域有： `SparkContext`。
+ `minPartitions` – 最小分区数。
+ `targetPartitions` – 目标分区数。

返回 `GlueContext`。

## def this
<a name="glue-etl-scala-apis-glue-gluecontext-defs-this-2"></a>

```
def this( sc : SparkContext )
```

使用提供的 `GlueContext` 创建一个 `SparkContext` 对象。将最小分区数设置为 10，将目标分区数设置为 20。
+ `sc` — 这些区域有： `SparkContext`。

返回 `GlueContext`。

## def this
<a name="glue-etl-scala-apis-glue-gluecontext-defs-this-3"></a>

```
def this( sparkContext : JavaSparkContext )
```

使用提供的 `GlueContext` 创建一个 `JavaSparkContext` 对象。将最小分区数设置为 10，将目标分区数设置为 20。
+ `sparkContext` — 这些区域有： `JavaSparkContext`。

返回 `GlueContext`。