

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Glue 中使用 Hudi AWS 架構
<a name="aws-glue-programming-etl-format-hudi"></a>

AWS Glue 3.0 和更新版本支援資料湖的 Apache Hudi 架構。Hudi 是開放原始碼資料湖儲存架構，可簡化增量資料處理與資料管道開發。本主題涵蓋在 Hudi AWS 資料表中傳輸或存放資料時，在 Glue 中使用資料的可用功能。若要進一步了解 Hudi，請參閱官方 [Apache Hudi 文件](https://hudi.apache.org/docs/overview/)。

您可以使用 AWS Glue 在 Amazon S3 中的 Hudi 資料表上執行讀取和寫入操作，或使用 Glue Data Catalog 使用 Hudi AWS 資料表。還支援其他操作，包括插入、更新和所有 [Apache Spark 操作](https://hudi.apache.org/docs/quick-start-guide/)。

**注意**  
Glue 5.0 中的 Apache Hudi AWS 0.15.0 實作會在內部還原 [HUDI-7001](https://github.com/apache/hudi/pull/9936)。當記錄金鑰包含單一欄位時，不會顯示與複雜金鑰產生相關的迴歸。但是，此行為與 OSS Apache Hudi 0.15.0 不同。  
Apache Hudi 0.10.1 for AWS Glue 3.0 不支援 Hudi 讀取時合併 (MoR) 資料表。

下表列出每個 Glue 版本中包含的 Hudi AWS 版本。


****  

| AWS Glue 版本 | 支援的 Hudi 版本 | 
| --- | --- | 
| 5.1 | 1.0.2 | 
| 5.0 | 0.15.0 | 
| 4.0 | 0.12.1 | 
| 3.0 | 0.10.1 | 

若要進一步了解 AWS Glue 支援的資料湖架構，請參閱 [搭配 AWS Glue ETL 任務使用資料湖架構](aws-glue-programming-etl-datalake-native-frameworks.md)。

## 啟用 Hudi
<a name="aws-glue-programming-etl-format-hudi-enable"></a>

若要啟用 Hudi for AWS Glue，請完成下列任務：
+ 指定 `hudi` 作為 `--datalake-formats` 任務參數的值。如需詳細資訊，請參閱[在 Glue AWS 任務中使用任務參數](aws-glue-programming-etl-glue-arguments.md)。
+ 為您的 Glue 任務建立名為 AWS `--conf`的金鑰，並將其設定為下列值。您也可以選擇在指令碼中使用 `SparkConf` 設定以下組態。這些設定有助於 Apache Spark 正確處理 Hudi 資料表。

  ```
  spark.serializer=org.apache.spark.serializer.KryoSerializer
  ```
+ Glue 4.0 預設會啟用 Hudi AWS 的 Lake Formation 許可支援。讀取/寫入 Lake Formation 註冊的 Hudi 資料表時，不需要其他組態。若要讀取已註冊的 Hudi 資料表，Glue AWS 任務 IAM 角色必須具有 SELECT 許可。若要寫入已註冊的 Hudi 資料表，Glue AWS 任務 IAM 角色必須具有 SUPER 許可。若要進一步了解管理 Lake Formation 權限的資訊，請參閱[授予和撤銷 Data Catalog 資源的權限](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-catalog-permissions.html)。

**使用不同的 Hudi 版本**

若要使用 AWS Glue 不支援的 Hudi 版本，請使用 `--extra-jars`任務參數指定您自己的 Hudi JAR 檔案。請勿包括 `hudi` 作為 `--datalake-formats` 任務參數的值。如果您使用 AWS Glue 5.0 或更新版本，則必須設定`--user-jars-first true`任務參數。

## 範例：將 Hudi 資料表寫入 Amazon S3，並在 AWS Glue Data Catalog 中註冊
<a name="aws-glue-programming-etl-format-hudi-write"></a>

此範例指令碼示範如何將 Hudi 資料表寫入 Amazon S3，並將資料表註冊至 AWS Glue Data Catalog。此範例使用 Hudi [Hive 同步工具](https://hudi.apache.org/docs/syncing_metastore/)來註冊該資料表。

**注意**  
此範例需要您設定`--enable-glue-datacatalog`任務參數，才能使用 AWS Glue Data Catalog 做為 Apache Spark Hive 中繼存放區。如需詳細資訊，請參閱 [在 Glue AWS 任務中使用任務參數](aws-glue-programming-etl-glue-arguments.md)。

------
#### [ Python ]

```
# Example: Create a Hudi table from a DataFrame 
# and register the table to Glue Data Catalog

additional_options={
    "hoodie.table.name": "{{<your_table_name>}}",
    "hoodie.database.name": "{{<your_database_name>}}",
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "{{<your_recordkey_field>}}",
    "hoodie.datasource.write.precombine.field": "{{<your_precombine_field>}}",
    "hoodie.datasource.write.partitionpath.field": "{{<your_partitionkey_field>}}",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "{{<your_database_name>}}",
    "hoodie.datasource.hive_sync.table": "{{<your_table_name>}}",
    "hoodie.datasource.hive_sync.partition_fields": "{{<your_partitionkey_field>}}",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.mode": "hms",
    "path": "s3://{{<s3Path/>}}"
}

dataFrame.write.format("hudi") \
    .options(**additional_options) \
    .mode("overwrite") \
    .save()
```

------
#### [ Scala ]

```
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog

val additionalOptions = Map(
  "hoodie.table.name" -> "{{<your_table_name>}}",
  "hoodie.database.name" -> "{{<your_database_name>}}",
  "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
  "hoodie.datasource.write.operation" -> "upsert",
  "hoodie.datasource.write.recordkey.field" -> "{{<your_recordkey_field>}}",
  "hoodie.datasource.write.precombine.field" -> "{{<your_precombine_field>}}",
  "hoodie.datasource.write.partitionpath.field" -> "{{<your_partitionkey_field>}}",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.hive_sync.enable" -> "true",
  "hoodie.datasource.hive_sync.database" -> "{{<your_database_name>}}",
  "hoodie.datasource.hive_sync.table" -> "{{<your_table_name>}}",
  "hoodie.datasource.hive_sync.partition_fields" -> "{{<your_partitionkey_field>}}",
  "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
  "hoodie.datasource.hive_sync.use_jdbc" -> "false",
  "hoodie.datasource.hive_sync.mode" -> "hms",
  "path" -> "s3://{{<s3Path/>}}")

dataFrame.write.format("hudi")
  .options(additionalOptions)
  .mode("append")
  .save()
```

------

## 範例：使用 Glue Data Catalog 從 Amazon S3 AWS 讀取 Hudi 資料表
<a name="aws-glue-programming-etl-format-hudi-read"></a>

此範例會讀取您從 Amazon S3 在 [範例：將 Hudi 資料表寫入 Amazon S3，並在 AWS Glue Data Catalog 中註冊](#aws-glue-programming-etl-format-hudi-write) 中建立的 Hudi 資料表。

**注意**  
此範例需要您設定`--enable-glue-datacatalog`任務參數，才能使用 AWS Glue Data Catalog 做為 Apache Spark Hive 中繼存放區。如需詳細資訊，請參閱 [在 Glue AWS 任務中使用任務參數](aws-glue-programming-etl-glue-arguments.md)。

------
#### [ Python ]

在此範例中，使用 `GlueContext.create\_data\_frame.from\_catalog()` 方法。

```
# Example: Read a Hudi table from Glue Data Catalog

from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

dataFrame = glueContext.create_data_frame.from_catalog(
    database = "{{<your_database_name>}}",
    table_name = "{{<your_table_name>}}"
)
```

------
#### [ Scala ]

在此範例中，使用 [getCatalogSource](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSource) 方法。

```
// Example: Read a Hudi table from Glue Data Catalog

import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    
    val dataFrame = glueContext.getCatalogSource(
      database = "{{<your_database_name>}}",
      tableName = "{{<your_table_name>}}"
    ).getDataFrame()
  }
}
```

------

## 範例：在 Amazon S3 中更新 `DataFrame` 並將其插入到 Hudi 資料表中
<a name="aws-glue-programming-etl-format-hudi-update-insert"></a>

此範例使用 AWS Glue Data Catalog 將 DataFrame 插入您在 中建立的 Hudi 資料表[範例：將 Hudi 資料表寫入 Amazon S3，並在 AWS Glue Data Catalog 中註冊](#aws-glue-programming-etl-format-hudi-write)。

**注意**  
此範例需要您設定`--enable-glue-datacatalog`任務參數，才能使用 AWS Glue Data Catalog 做為 Apache Spark Hive 中繼存放區。如需詳細資訊，請參閱 [在 Glue AWS 任務中使用任務參數](aws-glue-programming-etl-glue-arguments.md)。

------
#### [ Python ]

在此範例中，使用 `GlueContext.write\_data\_frame.from\_catalog()` 方法。

```
# Example: Upsert a Hudi table from Glue Data Catalog

from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

glueContext.write_data_frame.from_catalog(
    frame = dataFrame,
    database = "{{<your_database_name>}}",
    table_name = "{{<your_table_name>}}",
    additional_options={
        "hoodie.table.name": "{{<your_table_name>}}",
        "hoodie.database.name": "{{<your_database_name>}}",
        "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.recordkey.field": "{{<your_recordkey_field>}}",
        "hoodie.datasource.write.precombine.field": "{{<your_precombine_field>}}",
        "hoodie.datasource.write.partitionpath.field": "{{<your_partitionkey_field>}}",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": "{{<your_database_name>}}",
        "hoodie.datasource.hive_sync.table": "{{<your_table_name>}}",
        "hoodie.datasource.hive_sync.partition_fields": "{{<your_partitionkey_field>}}",
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms"
    }
)
```

------
#### [ Scala ]

在此範例中，使用 [getCatalogSink](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSink) 方法。

```
// Example: Upsert a Hudi table from Glue Data Catalog

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    glueContext.getCatalogSink("{{<your_database_name>}}", "{{<your_table_name>}}",
      additionalOptions = JsonOptions(Map(
        "hoodie.table.name" -> "{{<your_table_name>}}",
        "hoodie.database.name" -> "{{<your_database_name>}}",
        "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
        "hoodie.datasource.write.operation" -> "upsert",
        "hoodie.datasource.write.recordkey.field" -> "{{<your_recordkey_field>}}",
        "hoodie.datasource.write.precombine.field" -> "{{<your_precombine_field>}}",
        "hoodie.datasource.write.partitionpath.field" -> "{{<your_partitionkey_field>}}",
        "hoodie.datasource.write.hive_style_partitioning" -> "true",
        "hoodie.datasource.hive_sync.enable" -> "true",
        "hoodie.datasource.hive_sync.database" -> "{{<your_database_name>}}",
        "hoodie.datasource.hive_sync.table" -> "{{<your_table_name>}}",
        "hoodie.datasource.hive_sync.partition_fields" -> "{{<your_partitionkey_field>}}",
        "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc" -> "false",
        "hoodie.datasource.hive_sync.mode" -> "hms"
      )))
      .writeDataFrame(dataFrame, glueContext)
  }
}
```

------

## 範例：使用 Spark 從 Amazon S3 讀取 Hudi 資料表
<a name="aws-glue-programming-etl-format-hudi-read-spark"></a>

此範例使用 Spark DataFrame API 從 Amazon S3 讀取 Hudi 資料表。

------
#### [ Python ]

```
# Example: Read a Hudi table from S3 using a Spark DataFrame

dataFrame = spark.read.format("hudi").load("s3://{{<s3path/>}}")
```

------
#### [ Scala ]

```
// Example: Read a Hudi table from S3 using a Spark DataFrame

val dataFrame = spark.read.format("hudi").load("s3://{{<s3path/>}}")
```

------

## 範例：使用 Spark 將 Hudi 資料表寫入 Amazon S3
<a name="aws-glue-programming-etl-format-hudi-write-spark"></a>

此範例使用 Spark 將 Hudi 資料表寫入 Amazon S3。

------
#### [ Python ]

```
# Example: Write a Hudi table to S3 using a Spark DataFrame

dataFrame.write.format("hudi") \
    .options(**additional_options) \
    .mode("overwrite") \
    .save("s3://{{<s3Path/>}})
```

------
#### [ Scala ]

```
// Example: Write a Hudi table to S3 using a Spark DataFrame

dataFrame.write.format("hudi")
  .options(additionalOptions)
  .mode("overwrite")
  .save("s3://{{<s3path/>}}")
```

------

## 範例：使用 Lake Formation 權限控制讀取和寫入 Hudi 資料表
<a name="aws-glue-programming-etl-format-hudi-read-write-lake-formation-tables"></a>

此範例會使用 Lake Formation 權限控制讀取和寫入 Hudi 資料表。

1. 建立 Hudi 資料表，並在 Lake Formation 中進行註冊。

   1. 若要啟用 Lake Formation 權限控制，您將需要先在 Lake Formation 中註冊資料表 Amazon S3 路徑。如需詳細資訊，請參閱 [Registering an Amazon S3 location](https://docs.aws.amazon.com/lake-formation/latest/dg/register-location.html) (註冊 Amazon S3 位置)。您可以從 Lake Formation 主控台或使用 CLI AWS 註冊：

      ```
      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>
      ```

      註冊 Amazon S3 位置後，指向位置 （或其任何子位置） 的任何 AWS Glue 資料表都會在`GetTable`呼叫中傳回 `IsRegisteredWithLakeFormation` 參數的值為 true。

   1. 建立透過 Spark DataFrame API 指向註冊之 Amazon S3 路徑的 Hudi 資料表：

      ```
      hudi_options = {
          'hoodie.table.name': table_name,
          'hoodie.database.name': database_name,
          'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
          'hoodie.datasource.write.recordkey.field': 'product_id',
          'hoodie.datasource.write.table.name': table_name,
          'hoodie.datasource.write.operation': 'upsert',
          'hoodie.datasource.write.precombine.field': 'updated_at',
          'hoodie.datasource.write.hive_style_partitioning': 'true',
          'hoodie.upsert.shuffle.parallelism': 2,
          'hoodie.insert.shuffle.parallelism': 2,
          'path': <S3_TABLE_LOCATION>,
          'hoodie.datasource.hive_sync.enable': 'true',
          'hoodie.datasource.hive_sync.database': database_name,
          'hoodie.datasource.hive_sync.table': table_name,
          'hoodie.datasource.hive_sync.use_jdbc': 'false',
          'hoodie.datasource.hive_sync.mode': 'hms'
      }
      
      df_products.write.format("hudi")  \
          .options(**hudi_options)  \
          .mode("overwrite")  \
          .save()
      ```

1. 將 Lake Formation 許可授予 AWS Glue 任務 IAM 角色。您可以從 Lake Formation 主控台授予許可，或使用 AWS CLI。如需詳細資訊，請參閱[使用 Lake Formation 主控台和具名資源方法授予資料表權限](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-table-permissions.html)。

1.  讀取在 Lake Formation 中註冊的 Hudi 資料表。該程式碼與讀取未註冊之 Hudi 資料表的程式碼相同。請注意，Glue AWS 任務 IAM 角色需要具有 SELECT 許可才能成功讀取。

   ```
    val dataFrame = glueContext.getCatalogSource(
         database = "<your_database_name>",
         tableName = "<your_table_name>"
       ).getDataFrame()
   ```

1. 寫入在 Lake Formation 中註冊的 Hudi 資料表。該程式碼與寫入未註冊之 Hudi 資料表的程式碼相同。請注意，Glue AWS 任務 IAM 角色需要具有 SUPER 許可才能成功寫入。

   ```
   glueContext.getCatalogSink("<your_database_name>", "<your_table_name>",
         additionalOptions = JsonOptions(Map(
           "hoodie.table.name" -> "<your_table_name>",
           "hoodie.database.name" -> "<your_database_name>",
           "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
           "hoodie.datasource.write.operation" -> "<write_operation>",
           "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>",
           "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>",
           "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>",
           "hoodie.datasource.write.hive_style_partitioning" -> "true",
           "hoodie.datasource.hive_sync.enable" -> "true",
           "hoodie.datasource.hive_sync.database" -> "<your_database_name>",
           "hoodie.datasource.hive_sync.table" -> "<your_table_name>",
           "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>",
           "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
           "hoodie.datasource.hive_sync.use_jdbc" -> "false",
           "hoodie.datasource.hive_sync.mode" -> "hms"
         )))
         .writeDataFrame(dataFrame, glueContext)
   ```