

# AWS Glue での Hudi フレームワークの使用
<a name="aws-glue-programming-etl-format-hudi"></a>

AWS Glue 3.0 以降では、データレイク向けに Apache Hudi フレームワークが利用できます。Hudi は、増分データ処理とデータパイプラインの開発を簡素化する、オープンソースのデータレイク用ストレージフレームワークです。このトピックでは、AWS Glue 内でデータを Hudi テーブルに転送または保存する際に利用可能な、各機能について説明します。Hudi の詳細については、公式の[Apache Hudi ドキュメント](https://hudi.apache.org/docs/overview/)を参照してください。

AWS Glue により、Amazon S3 内にある Hudi テーブルの読み取りおよび書き込み操作を実行できます。あるいは、AWS Glue データカタログを使用して、Hudi テーブルを操作することも可能です。挿入、更新、および、すべての [Apache Spark オペレーション](https://hudi.apache.org/docs/quick-start-guide/)を含む操作も、追加でサポートされています。

**注記**  
AWS Glue 5.0 の Apache Hudi 0.15.0 実装は [HUDI-7001](https://github.com/apache/hudi/pull/9936) を内部的に元に戻します。レコードキーが 1 つのフィールドで構成されている場合、Complex Key 生成に関連するリグレッションは表示されません。ただし、この動作は OSS Apache Hudi 0.15.0 とは異なります。  
Apache Hudi 0.10.1 for AWS Glue 3.0 では、Hudi Merge on Read (MoR) テーブルはサポートされません。

次の表に、AWS Glue の各バージョンに含まれている、Hudi のバージョンを一覧で示します。


****  

| AWS Glue のバージョン | サポートされる Hudi バージョン | 
| --- | --- | 
| 5.1 | 1.0.2 | 
| 5.0 | 0.15.0 | 
| 4.0 | 0.12.1 | 
| 3.0 | 0.10.1 | 

AWS Glue がサポートするデータレイクフレームワークの詳細については、「[AWS Glue ETL ジョブでのデータレイクフレームワークの使用](aws-glue-programming-etl-datalake-native-frameworks.md)」を参照してください。

## Hudi の有効化
<a name="aws-glue-programming-etl-format-hudi-enable"></a>

AWS Glue で Hudi を有効化するには、以下のタスクを実行します。
+ `hudi` を `--datalake-formats` のジョブパラメータの値として指定します。詳細については、「[AWS Glue ジョブでジョブパラメータを使用する](aws-glue-programming-etl-glue-arguments.md)」を参照してください。
+ AWS Glue ジョブ用に、`--conf` という名前でキーを作成し、それに次の値を設定します。または、スクリプトで `SparkConf` を使用して、次の構成を設定することもできます。これらの設定は、Apache Spark が Hudi テーブルを適切に処理するために役立ちます。

  ```
  spark.serializer=org.apache.spark.serializer.KryoSerializer
  ```
+ Hudi に関する Lake Formation の許可のサポートは、AWS Glue 4.0 のためにデフォルトで有効になっています。Lake Formation に登録された Hudi テーブルの読み取り/書き込みに追加の設定は必要ありません。登録された Hudi テーブルを読み取るには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要です。登録された Hudi テーブルに書き込むには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要です。Lake Formation の許可の管理の詳細については、「[Data Catalog リソースに対する許可の付与と取り消し](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-catalog-permissions.html)」を参照してください。

**別の Hudi バージョンの使用**

AWS Glue でサポートされないバージョンの Hudi を使用するには、`--extra-jars` job パラメーターにより独自の Hudi JAR ファイルを指定します。`--datalake-formats` ジョブパラメータの値として、`hudi` は含めないでください。AWS Glue 5.0 以降を使用する場合、`--user-jars-first true` ジョブパラメータを設定する必要があります。

## 例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する
<a name="aws-glue-programming-etl-format-hudi-write"></a>

このスクリプト例では、Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する方法を示します。この例では、テーブルを登録するために、Hudi [Hive Sync ツール](https://hudi.apache.org/docs/syncing_metastore/)を使用します。

**注記**  
この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、`--enable-glue-datacatalog` ジョブパラメータの設定が必要となります。詳細については[AWS Glue ジョブでジョブパラメータを使用する](aws-glue-programming-etl-glue-arguments.md)を参照してください。

------
#### [ Python ]

```
# Example: Create a Hudi table from a DataFrame 
# and register the table to Glue Data Catalog

additional_options={
    "hoodie.table.name": "{{<your_table_name>}}",
    "hoodie.database.name": "{{<your_database_name>}}",
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "{{<your_recordkey_field>}}",
    "hoodie.datasource.write.precombine.field": "{{<your_precombine_field>}}",
    "hoodie.datasource.write.partitionpath.field": "{{<your_partitionkey_field>}}",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "{{<your_database_name>}}",
    "hoodie.datasource.hive_sync.table": "{{<your_table_name>}}",
    "hoodie.datasource.hive_sync.partition_fields": "{{<your_partitionkey_field>}}",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.mode": "hms",
    "path": "s3://{{<s3Path/>}}"
}

dataFrame.write.format("hudi") \
    .options(**additional_options) \
    .mode("overwrite") \
    .save()
```

------
#### [ Scala ]

```
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog

val additionalOptions = Map(
  "hoodie.table.name" -> "{{<your_table_name>}}",
  "hoodie.database.name" -> "{{<your_database_name>}}",
  "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
  "hoodie.datasource.write.operation" -> "upsert",
  "hoodie.datasource.write.recordkey.field" -> "{{<your_recordkey_field>}}",
  "hoodie.datasource.write.precombine.field" -> "{{<your_precombine_field>}}",
  "hoodie.datasource.write.partitionpath.field" -> "{{<your_partitionkey_field>}}",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.hive_sync.enable" -> "true",
  "hoodie.datasource.hive_sync.database" -> "{{<your_database_name>}}",
  "hoodie.datasource.hive_sync.table" -> "{{<your_table_name>}}",
  "hoodie.datasource.hive_sync.partition_fields" -> "{{<your_partitionkey_field>}}",
  "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
  "hoodie.datasource.hive_sync.use_jdbc" -> "false",
  "hoodie.datasource.hive_sync.mode" -> "hms",
  "path" -> "s3://{{<s3Path/>}}")

dataFrame.write.format("hudi")
  .options(additionalOptions)
  .mode("append")
  .save()
```

------

## 例: AWS Glue データカタログを使用した Amazon S3 からの Hudi テーブルの読み取り
<a name="aws-glue-programming-etl-format-hudi-read"></a>

この例では、「[例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する](#aws-glue-programming-etl-format-hudi-write)」で作成した Hudi テーブルを Amazon S3 から読み取ります。

**注記**  
この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、`--enable-glue-datacatalog` ジョブパラメータの設定が必要となります。詳細については[AWS Glue ジョブでジョブパラメータを使用する](aws-glue-programming-etl-glue-arguments.md)を参照してください。

------
#### [ Python ]

この例では、`GlueContext.create\_data\_frame.from\_catalog()` メソッドを使用します。

```
# Example: Read a Hudi table from Glue Data Catalog

from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

dataFrame = glueContext.create_data_frame.from_catalog(
    database = "{{<your_database_name>}}",
    table_name = "{{<your_table_name>}}"
)
```

------
#### [ Scala ]

この例では [getCatalogSource](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSource) メソッドを使用します。

```
// Example: Read a Hudi table from Glue Data Catalog

import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    
    val dataFrame = glueContext.getCatalogSource(
      database = "{{<your_database_name>}}",
      tableName = "{{<your_table_name>}}"
    ).getDataFrame()
  }
}
```

------

## 例: `DataFrame` を更新して、Amazon S3 内にある Hudi テーブルに挿入する
<a name="aws-glue-programming-etl-format-hudi-update-insert"></a>

この例では、「[例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する](#aws-glue-programming-etl-format-hudi-write)」で作成した Hudi テーブルに対し、AWS Glue データカタログを使用して DataFrame を挿入します。

**注記**  
この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、`--enable-glue-datacatalog` ジョブパラメータの設定が必要となります。詳細については[AWS Glue ジョブでジョブパラメータを使用する](aws-glue-programming-etl-glue-arguments.md)を参照してください。

------
#### [ Python ]

この例では、`GlueContext.write\_data\_frame.from\_catalog()` メソッドを使用します。

```
# Example: Upsert a Hudi table from Glue Data Catalog

from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

glueContext.write_data_frame.from_catalog(
    frame = dataFrame,
    database = "{{<your_database_name>}}",
    table_name = "{{<your_table_name>}}",
    additional_options={
        "hoodie.table.name": "{{<your_table_name>}}",
        "hoodie.database.name": "{{<your_database_name>}}",
        "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.recordkey.field": "{{<your_recordkey_field>}}",
        "hoodie.datasource.write.precombine.field": "{{<your_precombine_field>}}",
        "hoodie.datasource.write.partitionpath.field": "{{<your_partitionkey_field>}}",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": "{{<your_database_name>}}",
        "hoodie.datasource.hive_sync.table": "{{<your_table_name>}}",
        "hoodie.datasource.hive_sync.partition_fields": "{{<your_partitionkey_field>}}",
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms"
    }
)
```

------
#### [ Scala ]

この例では [getCatalogSink](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getCatalogSink) メソッドを使用します。

```
// Example: Upsert a Hudi table from Glue Data Catalog

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    glueContext.getCatalogSink("{{<your_database_name>}}", "{{<your_table_name>}}",
      additionalOptions = JsonOptions(Map(
        "hoodie.table.name" -> "{{<your_table_name>}}",
        "hoodie.database.name" -> "{{<your_database_name>}}",
        "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
        "hoodie.datasource.write.operation" -> "upsert",
        "hoodie.datasource.write.recordkey.field" -> "{{<your_recordkey_field>}}",
        "hoodie.datasource.write.precombine.field" -> "{{<your_precombine_field>}}",
        "hoodie.datasource.write.partitionpath.field" -> "{{<your_partitionkey_field>}}",
        "hoodie.datasource.write.hive_style_partitioning" -> "true",
        "hoodie.datasource.hive_sync.enable" -> "true",
        "hoodie.datasource.hive_sync.database" -> "{{<your_database_name>}}",
        "hoodie.datasource.hive_sync.table" -> "{{<your_table_name>}}",
        "hoodie.datasource.hive_sync.partition_fields" -> "{{<your_partitionkey_field>}}",
        "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc" -> "false",
        "hoodie.datasource.hive_sync.mode" -> "hms"
      )))
      .writeDataFrame(dataFrame, glueContext)
  }
}
```

------

## 例: Spark を使用した Amazon S3 からの Hudi テーブルの読み取り
<a name="aws-glue-programming-etl-format-hudi-read-spark"></a>

この例では、Spark DataFrame API を使用して Amazon S3 から Hudi テーブルを読み取ります。

------
#### [ Python ]

```
# Example: Read a Hudi table from S3 using a Spark DataFrame

dataFrame = spark.read.format("hudi").load("s3://{{<s3path/>}}")
```

------
#### [ Scala ]

```
// Example: Read a Hudi table from S3 using a Spark DataFrame

val dataFrame = spark.read.format("hudi").load("s3://{{<s3path/>}}")
```

------

## 例: スパークを使用した Amazon S3 への Hudi テーブルの書き込み
<a name="aws-glue-programming-etl-format-hudi-write-spark"></a>

この例では、Spark を使用して Amazon S3 に Hudi テーブルを書き込みます。

------
#### [ Python ]

```
# Example: Write a Hudi table to S3 using a Spark DataFrame

dataFrame.write.format("hudi") \
    .options(**additional_options) \
    .mode("overwrite") \
    .save("s3://{{<s3Path/>}})
```

------
#### [ Scala ]

```
// Example: Write a Hudi table to S3 using a Spark DataFrame

dataFrame.write.format("hudi")
  .options(additionalOptions)
  .mode("overwrite")
  .save("s3://{{<s3path/>}}")
```

------

## 例: Lake Formation の許可のコントロールを使用した Hudi テーブルの読み取りおよび書き込み
<a name="aws-glue-programming-etl-format-hudi-read-write-lake-formation-tables"></a>

この例では、Lake Formation の許可のコントロールを使用して Hudi テーブルの読み取りと書き込みを行います。

1. Hudi テーブルを作成して Lake Formation に登録します。

   1. Lake Formation の許可のコントロールを有効にするには、まずテーブルの Amazon S3 パスを Lake Formation に登録する必要があります。詳細については、「[Registering an Amazon S3 location](https://docs.aws.amazon.com/lake-formation/latest/dg/register-location.html)」を参照してください。Lake Formation コンソールから、または AWS CLI を使用して登録できます。

      ```
      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>
      ```

      Amazon S3 の場所が登録されると、その場所 (またはその子である場所) をポイントするすべての AWS Glue テーブルが、`GetTable` 呼び出しで `IsRegisteredWithLakeFormation` パラメータの値を true として返します。

   1. Spark DataFrame API を介して登録された Amazon S3 パスをポイントする Hudi テーブルを作成します。

      ```
      hudi_options = {
          'hoodie.table.name': table_name,
          'hoodie.database.name': database_name,
          'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
          'hoodie.datasource.write.recordkey.field': 'product_id',
          'hoodie.datasource.write.table.name': table_name,
          'hoodie.datasource.write.operation': 'upsert',
          'hoodie.datasource.write.precombine.field': 'updated_at',
          'hoodie.datasource.write.hive_style_partitioning': 'true',
          'hoodie.upsert.shuffle.parallelism': 2,
          'hoodie.insert.shuffle.parallelism': 2,
          'path': <S3_TABLE_LOCATION>,
          'hoodie.datasource.hive_sync.enable': 'true',
          'hoodie.datasource.hive_sync.database': database_name,
          'hoodie.datasource.hive_sync.table': table_name,
          'hoodie.datasource.hive_sync.use_jdbc': 'false',
          'hoodie.datasource.hive_sync.mode': 'hms'
      }
      
      df_products.write.format("hudi")  \
          .options(**hudi_options)  \
          .mode("overwrite")  \
          .save()
      ```

1. AWS Glue ジョブ IAM ロールに Lake Formation の許可を付与します。Lake Formation コンソールから、または AWS CLI を使用して許可を付与できます。詳細については、「[Lake Formation コンソールと名前付きリソース方式を使用したテーブル許可の付与](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-table-permissions.html)」を参照してください。

1.  Lake Formation に登録されている Hudi テーブルを読み取ります。このコードは、未登録の Hudi テーブルを読み取る場合と同じです。読み取りを成功させるには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要であることに留意してください。

   ```
    val dataFrame = glueContext.getCatalogSource(
         database = "<your_database_name>",
         tableName = "<your_table_name>"
       ).getDataFrame()
   ```

1. Lake Formation に登録されている Hudi テーブルに書き込みます。このコードは、未登録の Hudi テーブルに書き込む場合と同じです。書き込みを成功させるには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要であることに留意してください。

   ```
   glueContext.getCatalogSink("<your_database_name>", "<your_table_name>",
         additionalOptions = JsonOptions(Map(
           "hoodie.table.name" -> "<your_table_name>",
           "hoodie.database.name" -> "<your_database_name>",
           "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
           "hoodie.datasource.write.operation" -> "<write_operation>",
           "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>",
           "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>",
           "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>",
           "hoodie.datasource.write.hive_style_partitioning" -> "true",
           "hoodie.datasource.hive_sync.enable" -> "true",
           "hoodie.datasource.hive_sync.database" -> "<your_database_name>",
           "hoodie.datasource.hive_sync.table" -> "<your_table_name>",
           "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>",
           "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
           "hoodie.datasource.hive_sync.use_jdbc" -> "false",
           "hoodie.datasource.hive_sync.mode" -> "hms"
         )))
         .writeDataFrame(dataFrame, glueContext)
   ```