

# 在 AWS Glue 中使用 Hudi 框架
<a name="aws-glue-programming-etl-format-hudi"></a>

AWS Glue 3.0 及更高版本支持数据湖的 Apache Hudi 框架。Hudi 是一个开源数据湖存储框架，简化增量数据处理和数据管道开发。本主题涵盖了在 Hudi 表中传输或存储数据时，在 AWS Glue 中使用数据的可用功能。要了解有关 Hudi 的更多信息，请参阅 [Apache Hudi 官方文档](https://hudi.apache.org/docs/overview/)。

您可以使用 AWS Glue 对 Amazon S3 中的 Hudi 表执行读写操作，也可以使用 AWS Glue 数据目录处理 Hudi 表。还支持其他操作，包括插入、更新和所有 [Apache Spark 操作](https://hudi.apache.org/docs/quick-start-guide/)。

**注意**  
AWS Glue 5.0 中的 Apache Hudi 0.15.0 实现会在内部回退 [HUDI-7001](https://github.com/apache/hudi/pull/9936) 变更。如果记录密钥由单个字段组成，则不会出现与复杂键生成相关的回归问题。不过，这种行为与 OSS Apache Hudi 0.15.0 不同。  
Apache Hudi 0.10.1 for AWS Glue 3.0 不支持 Read（MoR）表上的 Hudi Merge。

下表列出了 AWS Glue 每个版本中包含的 Hudi 版本。


****  

| AWS Glue 版本 | 支持的 Hudi 版本 | 
| --- | --- | 
| 5.1 | 1.0.2 | 
| 5.0 | 0.15.0 | 
| 4.0 | 0.12.1 | 
| 3.0 | 0.10.1 | 

要了解有关 AWS Glue 支持的数据湖框架的更多信息，请参阅[在 AWS Glue ETL 任务中使用数据湖框架](aws-glue-programming-etl-datalake-native-frameworks.md)。

## 启用 Hudi
<a name="aws-glue-programming-etl-format-hudi-enable"></a>

要为 AWS Glue 启用 Hudi，请完成以下任务：
+ 指定 `hudi` 作为 `--datalake-formats` 作业参数的值。有关更多信息，请参阅 [在 AWS Glue 作业中使用作业参数](aws-glue-programming-etl-glue-arguments.md)。
+ `--conf` 为 Glue 作业创建一个名为 AWS 的密钥，并将其设置为以下值。或者，您可以在脚本中使用 `SparkConf` 设置以下配置。这些设置有助于 Apache Spark 正确处理 Hudi 表。

  ```
  spark.serializer=org.apache.spark.serializer.KryoSerializer
  ```
+ AWS Glue 4.0 默认为 Hudi 表启用了 Lake Formation 权限支持。无需额外配置即可读取/写入注册到 Lake Formation 的 Hudi 表。AWS Glue 作业 IAM 角色必须具有 SELECT 权限才能读取已注册的 Hudi 表。AWS Glue 作业 IAM 角色必须具有 SUPER 权限才能写入已注册的 Hudi 表。要了解有关管理 Lake Formation 权限的更多信息，请参阅 [Granting and revoking permissions on Data Catalog resources](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-catalog-permissions.html)。

**使用不同的 Hudi 版本**

要使用 AWS Glue 不支持的 Hudi 版本，请使用 `--extra-jars` 作业参数指定您自己的 Hudi JAR 文件。请勿使用 `hudi` 作为 `--datalake-formats` 作业参数的值。如果使用 AWS Glue 5.0 或更高版本，则必须设置 `--user-jars-first true` 作业参数。

## 示例：将 Hudi 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录中
<a name="aws-glue-programming-etl-format-hudi-write"></a>

以下示例脚本脚本演示了如何将 Hudi 表写入 Amazon S3，并将该表注册到 AWS Glue 数据目录。该示例使用 Hudi [Hive 同步工具](https://hudi.apache.org/docs/syncing_metastore/)来注册该表。

**注意**  
此示例要求您设置 `--enable-glue-datacatalog` 作业参数，才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息，请参阅[在 AWS Glue 作业中使用作业参数](aws-glue-programming-etl-glue-arguments.md)。

------
#### [ Python ]

```
# Example: Create a Hudi table from a DataFrame 
# and register the table to Glue Data Catalog

additional_options={
    "hoodie.table.name": "<your_table_name>",
    "hoodie.database.name": "<your_database_name>",
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>",
    "hoodie.datasource.write.precombine.field": "<your_precombine_field>",
    "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "<your_database_name>",
    "hoodie.datasource.hive_sync.table": "<your_table_name>",
    "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.mode": "hms",
    "path": "s3://<s3Path/>"
}

dataFrame.write.format("hudi") \
    .options(**additional_options) \
    .mode("overwrite") \
    .save()
```

------
#### [ Scala ]

```
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog

val additionalOptions = Map(
  "hoodie.table.name" -> "<your_table_name>",
  "hoodie.database.name" -> "<your_database_name>",
  "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
  "hoodie.datasource.write.operation" -> "upsert",
  "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>",
  "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>",
  "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.hive_sync.enable" -> "true",
  "hoodie.datasource.hive_sync.database" -> "<your_database_name>",
  "hoodie.datasource.hive_sync.table" -> "<your_table_name>",
  "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>",
  "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
  "hoodie.datasource.hive_sync.use_jdbc" -> "false",
  "hoodie.datasource.hive_sync.mode" -> "hms",
  "path" -> "s3://<s3Path/>")

dataFrame.write.format("hudi")
  .options(additionalOptions)
  .mode("append")
  .save()
```

------

## 示例：使用 AWS Glue Data Catalog 从 Amazon S3 读取 Hudi 表
<a name="aws-glue-programming-etl-format-hudi-read"></a>

此示例从 Amazon S3 读取您在 [示例：将 Hudi 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录中](#aws-glue-programming-etl-format-hudi-write) 中创建的 Hudi 表。

**注意**  
此示例要求您设置 `--enable-glue-datacatalog` 任务参数，才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息，请参阅[在 AWS Glue 作业中使用作业参数](aws-glue-programming-etl-glue-arguments.md)。

------
#### [ Python ]

在本示例中，使用 `GlueContext.create\$1data\$1frame.from\$1catalog()` 方法。

```
# Example: Read a Hudi table from Glue Data Catalog

from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

dataFrame = glueContext.create_data_frame.from_catalog(
    database = "<your_database_name>",
    table_name = "<your_table_name>"
)
```

------
#### [ Scala ]

在本示例中，使用 [getCatalogSource](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSource) 方法。

```
// Example: Read a Hudi table from Glue Data Catalog

import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    
    val dataFrame = glueContext.getCatalogSource(
      database = "<your_database_name>",
      tableName = "<your_table_name>"
    ).getDataFrame()
  }
}
```

------

## 示例：更新 `DataFrame` 并将其插入到 Amazon S3 的 Hudi 表中
<a name="aws-glue-programming-etl-format-hudi-update-insert"></a>

此示例使用 AWS Glue Data Catalog 将 DataFrame 插入到您在 [示例：将 Hudi 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录中](#aws-glue-programming-etl-format-hudi-write) 中创建的 Hudi 表中。

**注意**  
此示例要求您设置 `--enable-glue-datacatalog` 任务参数，才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息，请参阅[在 AWS Glue 作业中使用作业参数](aws-glue-programming-etl-glue-arguments.md)。

------
#### [ Python ]

在本示例中，使用 `GlueContext.write\$1data\$1frame.from\$1catalog()` 方法。

```
# Example: Upsert a Hudi table from Glue Data Catalog

from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

glueContext.write_data_frame.from_catalog(
    frame = dataFrame,
    database = "<your_database_name>",
    table_name = "<your_table_name>",
    additional_options={
        "hoodie.table.name": "<your_table_name>",
        "hoodie.database.name": "<your_database_name>",
        "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>",
        "hoodie.datasource.write.precombine.field": "<your_precombine_field>",
        "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": "<your_database_name>",
        "hoodie.datasource.hive_sync.table": "<your_table_name>",
        "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>",
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms"
    }
)
```

------
#### [ Scala ]

在本示例中，使用 [getCatalogSink](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSink) 方法。

```
// Example: Upsert a Hudi table from Glue Data Catalog

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>",
      additionalOptions = JsonOptions(Map(
        "hoodie.table.name" -> "<your_table_name>",
        "hoodie.database.name" -> "<your_database_name>",
        "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
        "hoodie.datasource.write.operation" -> "upsert",
        "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>",
        "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>",
        "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>",
        "hoodie.datasource.write.hive_style_partitioning" -> "true",
        "hoodie.datasource.hive_sync.enable" -> "true",
        "hoodie.datasource.hive_sync.database" -> "<your_database_name>",
        "hoodie.datasource.hive_sync.table" -> "<your_table_name>",
        "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>",
        "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc" -> "false",
        "hoodie.datasource.hive_sync.mode" -> "hms"
      )))
      .writeDataFrame(dataFrame, glueContext)
  }
}
```

------

## 示例：使用 Spark 从 Amazon S3 读取 Hudi 表
<a name="aws-glue-programming-etl-format-hudi-read-spark"></a>

此示例使用 Spark DataFrame API 从 Amazon S3 读取 Hudi 表。

------
#### [ Python ]

```
# Example: Read a Hudi table from S3 using a Spark DataFrame

dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
```

------
#### [ Scala ]

```
// Example: Read a Hudi table from S3 using a Spark DataFrame

val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
```

------

## 示例：使用 Spark 向 Amazon S3 写入 Hudi 表
<a name="aws-glue-programming-etl-format-hudi-write-spark"></a>

示例：使用 Spark 向 Amazon S3 写入 Hudi 表

------
#### [ Python ]

```
# Example: Write a Hudi table to S3 using a Spark DataFrame

dataFrame.write.format("hudi") \
    .options(**additional_options) \
    .mode("overwrite") \
    .save("s3://<s3Path/>)
```

------
#### [ Scala ]

```
// Example: Write a Hudi table to S3 using a Spark DataFrame

dataFrame.write.format("hudi")
  .options(additionalOptions)
  .mode("overwrite")
  .save("s3://<s3path/>")
```

------

## 示例：读取和写入具有 Lake Formation 权限控制的 Hudi 表
<a name="aws-glue-programming-etl-format-hudi-read-write-lake-formation-tables"></a>

此示例将读取和写入一个具有 Lake Formation 权限控制的 Hudi 表。

1. 创建一个 Hudi 表并将其注册到 Lake Formation。

   1. 要启用 Lake Formation 权限控制，您首先需要将表的 Amazon S3 路径注册到 Lake Formation。有关更多信息，请参阅 [Registering an Amazon S3 location](https://docs.aws.amazon.com/lake-formation/latest/dg/register-location.html)（注册 Amazon S3 位置）。您可以通过 Lake Formation 控制台或使用 AWS CLI 进行注册：

      ```
      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>
      ```

      注册了 Amazon S3 位置后，对于任何指向该位置（或其任何子位置）的 AWS Glue 表，`GetTable` 调用中的 `IsRegisteredWithLakeFormation` 参数都将返回值 true。

   1. 创建一个指向通过 Spark dataframe API 注册的 Amazon S3 路径的 Hudi 表：

      ```
      hudi_options = {
          'hoodie.table.name': table_name,
          'hoodie.database.name': database_name,
          'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
          'hoodie.datasource.write.recordkey.field': 'product_id',
          'hoodie.datasource.write.table.name': table_name,
          'hoodie.datasource.write.operation': 'upsert',
          'hoodie.datasource.write.precombine.field': 'updated_at',
          'hoodie.datasource.write.hive_style_partitioning': 'true',
          'hoodie.upsert.shuffle.parallelism': 2,
          'hoodie.insert.shuffle.parallelism': 2,
          'path': <S3_TABLE_LOCATION>,
          'hoodie.datasource.hive_sync.enable': 'true',
          'hoodie.datasource.hive_sync.database': database_name,
          'hoodie.datasource.hive_sync.table': table_name,
          'hoodie.datasource.hive_sync.use_jdbc': 'false',
          'hoodie.datasource.hive_sync.mode': 'hms'
      }
      
      df_products.write.format("hudi")  \
          .options(**hudi_options)  \
          .mode("overwrite")  \
          .save()
      ```

1. 向 AWS Glue 作业 IAM 角色授予 Lake Formation 权限。您可以通过 Lake Formation 控制台授予权限，也可以使用 AWS CLI 授予权限。有关更多信息，请参阅 [Granting table permissions using the Lake Formation console and the named resource method](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-table-permissions.html)。

1.  读取注册到 Lake Formation 的 Hudi 表。代码与读取未注册的 Hudi 表相同。请注意，AWS Glue 作业 IAM 角色需要具有 SELECT 权限才能成功读取。

   ```
    val dataFrame = glueContext.getCatalogSource(
         database = "<your_database_name>",
         tableName = "<your_table_name>"
       ).getDataFrame()
   ```

1. 写入注册到 Lake Formation 的 Hudi 表。代码与写入未注册的 Hudi 表相同。请注意，AWS Glue 作业 IAM 角色需要具有 SUPER 权限才能成功写入。

   ```
   glueContext.getCatalogSink("<your_database_name>", "<your_table_name>",
         additionalOptions = JsonOptions(Map(
           "hoodie.table.name" -> "<your_table_name>",
           "hoodie.database.name" -> "<your_database_name>",
           "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
           "hoodie.datasource.write.operation" -> "<write_operation>",
           "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>",
           "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>",
           "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>",
           "hoodie.datasource.write.hive_style_partitioning" -> "true",
           "hoodie.datasource.hive_sync.enable" -> "true",
           "hoodie.datasource.hive_sync.database" -> "<your_database_name>",
           "hoodie.datasource.hive_sync.table" -> "<your_table_name>",
           "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>",
           "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
           "hoodie.datasource.hive_sync.use_jdbc" -> "false",
           "hoodie.datasource.hive_sync.mode" -> "hms"
         )))
         .writeDataFrame(dataFrame, glueContext)
   ```