

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Iceberg でクラスターを使用する
<a name="emr-iceberg-use-cluster"></a>

このセクションでは、Spark、Trino、Flink、および Hive で Iceberg を使用する方法について説明します。

# Spark で Iceberg クラスターを使用する
<a name="emr-iceberg-use-spark-cluster"></a>

Amazon EMR バージョン 6.5.0 以降では、ブートストラップアクションを追加しなくても Spark クラスターで Iceberg を使用できます。Amazon EMR バージョン 6.4.0 以前の場合、ブートストラップアクションを使用して必要なすべての依存関係を事前インストールできます。

このチュートリアルでは、 AWS CLI を使用して Amazon EMR Spark クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「[Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)」の手順に従ってください。

## Iceberg クラスターの作成
<a name="emr-iceberg-create-cluster"></a>

Iceberg がインストールされたクラスターは、、 AWS マネジメントコンソール、 AWS CLI または Amazon EMR API を使用して作成できます。このチュートリアルでは、 AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「[Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)」の手順に従ってください。

で Amazon EMR で Iceberg を使用するには AWS CLI、まず次のステップでクラスターを作成します。を使用して Iceberg 分類を指定する方法については AWS CLI、[クラスターの作成 AWS CLI 時に を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)「」または「」を参照してください[クラスター作成時に Java SDK を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。

1. 以下のコンテンツを含む `configurations.json` ファイルを作成します。

   ```
   [{
       "Classification":"iceberg-defaults",
       "Properties":{"iceberg.enabled":"true"}
   }]
   ```

1. 次に、以下の設定でクラスターを作成します。この例の Amazon S3 バケットパスとサブネット ID は、実際の値に置き換えてください。

   ```
   aws emr create-cluster --release-label emr-6.5.0 \
   --applications Name=Spark \
   --configurations file://configurations.json \
   --region us-east-1 \
   --name My_Spark_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket/ \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

または、Spark アプリケーションを含む Amazon EMR クラスターを作成し、`/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` ファイルを Spark ジョブの JAR 依存関係として追加することもできます。詳細については、「[Submitting Applications](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)」を参照してください。

この jar を Spark ジョブの依存関係として含めるには、以下の設定プロパティを Spark アプリケーションに追加します。

```
--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
```

Spark ジョブの依存関係の詳細については、Apache Spark ドキュメント「[Running Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html)」の「[Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)」を参照してください。

## Iceberg の Spark セッションを初期化する
<a name="emr-iceberg-initialize-spark-session"></a>

以下の例では、インタラクティブな Spark シェルを起動し、Spark submit を使用するか、Amazon EMR Notebooks を使用して、Amazon EMR で Iceberg を操作する方法を示します。

------
#### [ spark-shell ]

1. SSH を使用してマスターノードに接続します。詳細については、「*Amazon EMR 管理ガイド*」の「[SSH を使用してマスターノードに接続する](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)」を参照してください。

1. 以下のコマンドを入力して、Spark シェルを起動します。PySpark シェルを使用するには、`spark-shell` を `pyspark` に置き換えます。

   ```
   spark-shell \
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket/prefix/
       --conf spark.sql.catalog.my_catalog.type=glue \
       --conf spark.sql.defaultCatalog=my_catalog \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ spark-submit ]

1. SSH を使用してマスターノードに接続します。詳細については、「*Amazon EMR 管理ガイド*」の「[SSH を使用してマスターノードに接続する](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)」を参照してください。

1. 以下のコマンドを入力して、Iceberg の Spark セッションを起動します。

   ```
   spark-submit \
   --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
   --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
   --conf spark.sql.catalog.my_catalog.type=glue \
   --conf spark.sql.defaultCatalog=my_catalog \
   --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ EMR Studio notebooks ]

EMR Studio ノートブックを使用して Spark セッションを初期化するには、次の例のように、Amazon EMR Notebooks で `%%configure` マジックコマンドを使用して Spark セッションを設定します。詳細については、「*Amazon EMR 管理ガイド*」の「[Use EMR Notebooks magics](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)」を参照してください。

```
%%configure -f{
"conf":{
    "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.my_catalog.type":"glue",
    "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
    "spark.sql.defaultCatalog":"my_catalog",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
}
```

------
#### [ CLI ]

CLI を使用して Spark クラスターを初期化し、Spark Iceberg セッションのデフォルト設定をすべて設定するには、以下のサンプルを実行します。 AWS CLI と Amazon EMR API を使用して設定分類を指定する方法の詳細については、[「アプリケーションの設定](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html)」を参照してください。

```
[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.my_catalog.type":"glue",
      "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
      "spark.sql.defaultCatalog":"my_catalog",
      "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
  }
]
```

------

## Iceberg テーブルへの書き込み
<a name="emr-iceberg-write-to-table"></a>

以下の例では、DataFrame を作成し、それを Iceberg データセットとして書き込む方法を示します。この例では、デフォルトの Hadoop ユーザーとして SSH を使用してマスターノードに接続しながら、Spark シェルを使用してデータセットを操作する方法を示しています。

**注記**  
コードサンプルを Spark シェルに貼り付けるには、プロンプトで「`:paste`」と入力し、例を貼り付けて、[`CTRL+D`] を押します。

------
#### [ PySpark ]

Spark には、Python ベースのシェルである `pyspark` 用意されており、Python で記述された Spark プログラムのプロトタイプを作成するために使用できます。マスターノードで `pyspark` を起動します。

```
## Create a DataFrame.
data = spark.createDataFrame([
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
],["id", "creation_date", "last_update_time"])

## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._

// Create a DataFrame.
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")

// Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------

## Iceberg テーブルからの読み込み
<a name="emr-iceberg-read-from-table"></a>

------
#### [ PySpark ]

```
df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Scala ]

```
val df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Spark SQL ]

```
SELECT * from dev.db.iceberg_table LIMIT 10
```

------

## Spark Iceberg AWS での Glue データカタログの使用
<a name="emr-iceberg-glue-catalog-config-spark"></a>

Spark Iceberg から AWS Glue データカタログに接続できます。このセクションでは、接続するためのさまざまなコマンドを示します。

### デフォルトのリージョンのデフォルトの AWS Glue カタログに接続する
<a name="emr-iceberg-glue-catalog-config-spark"></a>

このサンプルは、 Glue カタログタイプを使用して接続する方法を示しています。カタログ ID を指定しない場合、デフォルトが使用されます:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

### 特定のカタログ ID AWS を使用して Glue カタログに接続する
<a name="emr-iceberg-glue-catalog-config-spark"></a>

このサンプルは、カタログ ID を使用して接続する方法を示しています:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.catalog.my_catalog.glue.id=AWS Glue catalog ID \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

このコマンドは、別のアカウントの AWS Glue カタログ、RMS カタログ、またはフェデレーティッドカタログに接続するために使用できます。

## Spark Iceberg での Iceberg REST Catalog (IRC) の使用
<a name="emr-iceberg-rest-catalog-config"></a>

以下のセクションでは、Iceberg とカタログの統合を設定する方法について詳しく説明します。

### Glue Data Catalog IRC AWS エンドポイントに接続する
<a name="emr-iceberg-rest-catalog-config-gdc"></a>

Iceberg REST を使用するための `spark-submit` コマンドの例を以下に示します:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=glue catalog ID \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=glue endpoint URI/iceberg \
    --conf spark.sql.catalog.my_catalog.rest.sigv4-enabled=true \
    --conf spark.sql.catalog.my_catalog.rest.signing-name=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

ランタイムロールが有効なクラスターで使用するには、次の追加の spark 設定が必要です:

```
"spark.hadoop.fs.s3.credentialsResolverClass": "software.amazon.glue.GlueTableCredentialsResolver",
"spark.hadoop.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.hadoop.glue.id": glue catalog ID
"spark.hadoop.glue.endpoint": "glue endpoint"
```

各リージョンの AWS Glue エンドポイント URL リストについては、「 [AWS Glue エンドポイントとクォータ](https://docs.aws.amazon.com/general/latest/gr/glue.html)」を参照してください。

### 任意の IRC エンドポイントに接続する
<a name="emr-iceberg-rest-catalog-config-arbitrary"></a>

IRC エンドポイントを使用するための `spark-submit` コマンドの例を以下に示します:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=warehouse name \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=your rest endpoint \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

## Iceberg SparkCatalog と SparkSessionCatalog を使用する場合の設定の違い
<a name="emr-iceberg-spark-catalog"></a>

Iceberg では、Spark Iceberg カタログを作成する 2 つの方法を利用できます。Spark 設定は、`SparkCatalog` または `SparkSessionCatalog` のいずれかに設定できます。

### Iceberg SparkCatalog の使用
<a name="emr-iceberg-spark-catalog-spark-catalog"></a>

Spark Iceberg カタログとして **SparkCatalog** を使用するためのコマンドを以下に示します:

```
spark-shell \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.defaultCatalog=my_catalog
```

このアプローチに関する考慮事項:
+ Iceberg テーブルにはアクセスできますが、他のテーブルにはアクセスできません。
+ カタログ名を「**spark\$1catalog**」にすることはできません。これは Spark の初期カタログの名前です。常に Hive メタストアに接続します。これは、ユーザーが `spark.sql.defaultCatalog` を使用して上書きしない限り、Spark のデフォルトカタログです。
+ `spark.sql.defaultCatalog` をカタログ名に設定して、デフォルトのカタログにができます。

### Iceberg SparkSessionCatalog の使用
<a name="emr-iceberg-spark-catalog-spark-session"></a>

Spark Iceberg カタログとして **[SparkSessionCatalog]** を使用するためのコマンドを以下に示します。

```
spark-shell \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.spark_catalog.type=glue
```

このアプローチに関する考慮事項:
+ テーブルが Iceberg テーブルとして見つからない場合、Spark は Hive メタストア内のテーブルであるかどうかを確認しようとします。詳細については、[「Hive AWS のカタログとしての Glue データカタログの使用](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html)」を参照してください。
+ カタログ名は「**spark\$1catalog**」である必要があります。

## Iceberg Spark 拡張機能の使用
<a name="emr-iceberg-spark-catalog-extensions"></a>

Iceberg は、ユーザーが Spark 拡張機能設定 `spark.sql.extensions` を通じて設定できる Spark 拡張機能 `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions` を提供します。拡張機能により、行レベルの DELETE、UPDATE、MERGE などの Iceberg の主要な機能、圧縮、スナップショットの有効期限、分岐とタグ付けなどの Iceberg 固有の Spark データ定義言語ステートメントと手順が有効になります。詳細については、以下をご参照ください。
+ Iceberg Spark 書き込み拡張機能:「[Spark Writes](https://iceberg.apache.org/docs/nightly/spark-writes/)」
+ Iceberg Spark DDL 拡張機能:「[ALTER TABLE SQL extensions](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)」
+ Iceberg Spark プロシージャ拡張機能:「[Spark Procedures](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)」

## Spark で Iceberg を使用するための考慮事項
<a name="spark-considerations-catalog"></a>
+ Amazon EMR 6.5.0 は、デフォルトでは Amazon EMR on EKS での Iceberg の実行をサポートしていません。Amazon EMR 6.5.0 カスタムイメージが用意されているため、`--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` を `spark-submit` パラメータとして渡して Amazon EMR on EKS に Iceberg テーブルを作成できます。詳細については、「*Amazon EMR on EKS 開発ガイド*」の「[Submit a Spark workload in Amazon EMR using a custom image](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-steps.html#docker-custom-images-submit)」を参照してください。 サポート に問い合わせることもできます。Amazon EMR 6.6.0 以降、Iceberg は Amazon EMR on EKS でサポートされます。
+ Iceberg のカタログとして AWS Glue を使用する場合は、テーブルを作成するデータベースが Glue AWS に存在することを確認してください。などのサービスを使用して AWS Lake Formation いて、カタログをロードできない場合は、 コマンドを実行するためのサービスへの適切なアクセス権があることを確認してください。
+ 「」で説明されているように Iceberg SparkSessionCatalog を使用する場合は[Iceberg SparkCatalog と SparkSessionCatalog を使用する場合の設定の違い](#emr-iceberg-spark-catalog)、Spark Iceberg [AWS Glue データカタログの設定に加えて、「Apache Hive メタストアとして](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html) AWS Glue データカタログを設定する」で説明されている設定ステップに従う必要があります。

# Trino での Iceberg クラスターの使用
<a name="emr-iceberg-use-trino-cluster"></a>

Amazon EMR バージョン 6.6.0 以降では、Iceberg を Trino クラスターで使用できます。

このチュートリアルでは、 AWS CLI を使用して Amazon EMR Trino クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「[Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)」の手順に従ってください。

## Iceberg クラスターの作成
<a name="emr-iceberg-create-cluster-trino"></a>

で Amazon EMR で Iceberg を使用するには AWS CLI、まず次のステップでクラスターを作成します。を使用して Iceberg 分類を指定する方法については AWS CLI、[クラスターの作成 AWS CLI 時に を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)「」または「」を参照してください[クラスター作成時に Java SDK を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。

1. 以下のコンテンツを含む `configurations.json` ファイルを作成します。例えば、Hive メタストアをカタログとして使用する場合、ファイルには次の内容が含まれている必要があります。

   ```
   [
     {
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "hive.metastore.uri": "thrift://localhost:9083"
       }
     }
   ]
   ```

    AWS Glue データカタログをストアとして使用する場合は、ファイルに次のコンテンツが含まれている必要があります。

   ```
   [
     {
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "iceberg.catalog.type": "glue"
       }
     }
   ]
   ```

   Amazon EMR 7.7.0 以降、プロパティ *fs.native-s3.enabled=true* を含めます。

   ```
   [
     { 
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "iceberg.catalog.type": "glue",
         "fs.native-s3.enabled": "true"
       }           
     }                 
   ]
   ```

1. 次のように設定してクラスターを作成し、サンプルの Amazon S3 バケットパスとキー名を実際の値に置き換えます。

   ```
   aws emr create-cluster --release-label emr-6.7.0 \
   --applications Name=Trino \
   --region us-east-1 \
   --name My_Trino_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket \
   --configurations file://configurations.json \
   --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge InstanceGroupType=CORE,InstanceCount=3,InstanceType=c3.4xlarge \ 
   --use-default-roles \
   --ec2-attributes KeyName=<key-name>
   ```

## Iceberg の Trino セッションの初期化
<a name="emr-iceberg-initialize-trino"></a>

Trino セッションを初期化するには、次のコマンドを実行します。

```
trino-cli --catalog iceberg
```

## Iceberg テーブルへの書き込み
<a name="emr-iceberg-write-to-table-trino"></a>

次の SQL コマンドを使用してテーブルを作成し、書き込みます。

```
trino> SHOW SCHEMAS;
trino> CREATE TABLE default.iceberg_table (
            id int,
            data varchar,
            category varchar)
       WITH (
            format = 'PARQUET',
            partitioning = ARRAY['category', 'bucket(id, 16)'],
            location = 's3://amzn-s3-demo-bucket/<prefix>')
          
trino> INSERT INTO default.iceberg_table VALUES (1,'a','c1'), (2,'b','c2'), (3,'c','c3');
```

## Iceberg のテーブルからの読み込み
<a name="emr-iceberg-read-from-table-trino"></a>

Iceberg テーブルから読み込むには、次のコマンドを実行します。

```
trino> SELECT * from default.iceberg_table;
```

## Trino で Iceberg を使用するための考慮事項
<a name="trino-considerations"></a>
+ Amazon EMR 6.5 では、Iceberg の Trino Iceberg Catalog ネイティブサポートは提供されていません。Trino には Iceberg v0.11 が必要です。そのため Trino 用の Amazon EMR クラスターを Spark クラスターとは別に起動し、そのクラスターに Iceberg v0.11 をインストールすることをお勧めします。
+ Iceberg のカタログとして AWS Glue を使用する場合は、テーブルを作成するデータベースが Glue AWS に存在することを確認してください。などのサービスを使用して AWS Lake Formation いて、カタログをロードできない場合は、 コマンドを実行するためのサービスへの適切なアクセス権があることを確認してください。
+ Iceberg Glue 統合は、Redshift Managed Storage カタログでは機能しません。

# Flink での Iceberg クラスターの使用
<a name="emr-iceberg-use-flink-cluster"></a>

Amazon EMR バージョン 6.9.0 以降では、オープンソースの Iceberg Flink 統合を使用する際に必要なセットアップ手順なしに Flink クラスターで Iceberg を使用できます。

## Iceberg クラスターの作成
<a name="creating-iceberg-cluster"></a>

Iceberg がインストールされたクラスターは、 AWS マネジメントコンソール、 AWS CLI、または Amazon EMR API を使用して作成できます。このチュートリアルでは、 AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「[Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)」の手順に従ってください。

で Amazon EMR で Iceberg を使用するには AWS CLI、まず次のステップでクラスターを作成します。を使用して Iceberg 分類を指定する方法については AWS CLI、[クラスターの作成 AWS CLI 時に を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)「」または「」を参照してください[クラスター作成時に Java SDK を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。`configurations.json` というファイルを次の内容で作成します。

```
[{
"Classification":"iceberg-defaults",
    "Properties":{"iceberg.enabled":"true"}
}]
```

次に、以下の設定でクラスターを作成し、この例の Amazon S3 バケットパスとサブネット ID を独自の値に置き換えます。

```
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
```

または、Flink アプリケーションを含む Amazon EMR 6.9.0 クラスターを作成し、`/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar` ファイルを Flink ジョブの JAR 依存関係として使用することもできます。

## Flink SQL クライアントの使用
<a name="using-flink-sql-client"></a>

SQL クライアントスクリプトは `/usr/lib/flink/bin` にあります。次のコマンドを使用してスクリプトを実行します。

```
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
```

これにより Flink SQL シェルが起動します。

## Flink の例
<a name="flink-examples"></a>

### Iceberg テーブルの作成
<a name="create-iceberg-table"></a>

**Flink SQL**

```
CREATE CATALOG glue_catalog WITH (
   'type'='iceberg',
   'warehouse'='<WAREHOUSE>',
   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
 );

USE CATALOG  glue_catalog;

CREATE DATABASE IF NOT EXISTS <DB>;

USE <DB>;

CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
```

**Table API**

```
EnvironmentSettings settings =
                EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

String warehouse = "<WAREHOUSE>";
String db = "<DB>";

tEnv.executeSql(
                "CREATE CATALOG glue_catalog WITH (\n"
                        + "   'type'='iceberg',\n"
                        + "   'warehouse'='"
                        + warehouse
                        + "',\n"
                        + "   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
                        + "   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
                        + " );");

tEnv.executeSql("USE CATALOG  glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
        "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
```

### Iceberg テーブルへの書き込み
<a name="write-to-iceberg-table"></a>

**Flink SQL**

```
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
```

**Table API**

```
tEnv.executeSql(
        "INSERT INTO `glue_catalog`.`"
                + db
                + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
```

**Datastream API**

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String db = "<DB Name>";

String warehouse = "<Warehouse Path>";

GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));

DataStream<RowData> input = env.fromElements(rowData1);

Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");

CatalogLoader glueCatlogLoader =
        CatalogLoader.custom(
                "glue",
                props,
                new Configuration(),
                "org.apache.iceberg.aws.glue.GlueCatalog");

TableLoader tableLoader =
        TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));

DataStreamSink<Void> dataStreamSink =
        FlinkSink.forRowData(input).tableLoader(tableLoader).append();

env.execute("Datastream Write");
```

### Iceberg テーブルからの読み込み
<a name="read-from-iceberg-table"></a>

**Flink SQL**

```
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
```

**Table API**

```
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
```

**Datastream API**

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String db = "<DB Name>";

String warehouse = "<Warehouse Path>";

Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");

CatalogLoader glueCatlogLoader =
        CatalogLoader.custom(
                "glue",
                props,
                new Configuration(),
                "org.apache.iceberg.aws.glue.GlueCatalog");
                
TableLoader tableLoader =
        TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));

DataStream<RowData> batch =
                FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();

batch.print().name("print-sink");
```

## Hive カタログの使用
<a name="using-hive-catalog"></a>

Flink と Hive の依存関係が「[Hive Metastore と Glue Catalog を使用して Flink を設定する](flink-configure.md#flink-configure-hive)」の説明に従って解決されていることを確認します。

## Flink ジョブの実行
<a name="running-flink-job"></a>

Flink にジョブを送信する方法の 1 つは、ジョブ単位の Flink YARN セッションを使用することです。これを実行するには、次のコマンドを使用します。

```
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
```

## Flink で Iceberg を使用するための考慮事項
<a name="flink-considerations"></a>
+ Iceberg のカタログとして AWS Glue を使用する場合は、テーブルを作成するデータベースが Glue AWS に存在することを確認してください。などのサービスを使用して AWS Lake Formation いて、カタログをロードできない場合は、 コマンドを実行するためのサービスへの適切なアクセス権があることを確認してください。
+ Iceberg Glue 統合は、Redshift Managed Storage カタログでは機能しません。

# Hive での Iceberg クラスターの使用
<a name="emr-iceberg-use-hive-cluster"></a>

Amazon EMR リリース 6.9.0 以降では、オープンソースの Iceberg Hive 統合に必要なセットアップ手順を実行しなくても、Hive クラスターで Iceberg を使用できます。Amazon EMR バージョン 6.8.0 以前の場合は、ブートストラップアクションを使用して `iceberg-hive-runtime` jar をインストールし Hive での Iceberg サポートを設定できます。

Amazon EMR 6.9.0 には、[Hive 3.1.3 を Iceberg 0.14.1 と統合する](https://iceberg.apache.org/releases/#0140-release)ためのすべての機能が含まれています。また、サポートされている実行エンジンを実行時に自動選択するなどの Amazon EMR の追加機能も含まれています (Amazon EMR on EKS 6.9.0)。

## Iceberg クラスターの作成
<a name="create-iceberg-cluster"></a>

Iceberg がインストールされたクラスターは、、 AWS マネジメントコンソール、 AWS CLI または Amazon EMR API を使用して作成できます。このチュートリアルでは、 AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「[Build an Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)」の手順を実行します。

で Amazon EMR で Iceberg を使用するには AWS CLI、まず以下のステップを使用してクラスターを作成します。 AWS CLI または Java SDK を使用して Iceberg 分類を指定する方法については、[クラスターの作成 AWS CLI 時に を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)「」または「」を参照してください[クラスター作成時に Java SDK を使用して設定を指定する](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。以下の内容で `configurations.json` という名前のファイルを作成します。

```
[{
    "Classification":"iceberg-defaults",
    "Properties":{"iceberg.enabled":"true"}
}]
```

次に、以下の設定でクラスターを作成し、この例の Amazon S3 バケットパスとサブネット ID を自分のものに置き換えます。

```
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Hive \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_hive_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
```

Hive Iceberg クラスターは以下のことを行います。
+ Iceberg Hive ランタイム jar を Hive に読み込み、Hive エンジンの Iceberg 関連の設定を有効にします。
+ Amazon EMR Hive の動的実行エンジン選択を有効にして、ユーザーが Iceberg と互換性のある実行エンジンを設定できないようにします。

**注記**  
Hive Iceberg クラスターは現在 AWS Glue データカタログをサポートしていません。デフォルトの Iceberg カタログは `HiveCatalog` で、これは Hive 環境用に設定されたメタストアに対応しています。カタログ管理の詳細については、[Apache Hive ドキュメント](https://cwiki.apache.org/confluence/display/HIVE)の「[Using HCatalog](https://cwiki.apache.org/confluence/display/Hive/HCatalog+UsingHCat#HCatalogUsingHCat-UsingHCatalog)」を参照してください。

## 機能のサポート
<a name="feature-support"></a>

Amazon EMR 6.9.0 は Hive 3.1.3 と Iceberg 0.14.1 をサポートしています。機能のサポートについては Hive 3.1.2 と 3.1.3 の Iceberg 互換機能に限定されています。以下の   コマンドがサポートされています。
+ Amazon EMR リリース 6.9.0～6.12.x では、`libfb303` jar を Hive の `auxlib` ディレクトリに配置する必要があります。これを行うには、次のコマンドを使用します。

  ```
  sudo /usr/bin/ln -sf /usr/lib/hive/lib/libfb303-*.jar /usr/lib/hive/auxlib/libfb303.jar
  ```

  Amazon EMR リリース 6.13 以降では、`libfb303` jar は自動的に Hive の `auxlib` ディレクトリにシンボリックリンクされます。
+ **テーブルの作成**
  + **非パーティションテーブル** - Hive の外部テーブルは、次のようにストレージハンドラーを指定することで作成できます。

    ```
    CREATE EXTERNAL TABLE x (i int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    ```
  + **パーティションテーブル** - Hive の外部パーティションテーブルは次のように作成できます。

    ```
    CREATE EXTERNAL TABLE x (i int) PARTITIONED BY (j int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    ```
**注記**  
ORC/AVRO/PARQUET の `STORED AS` ファイル形式は Hive 3 ではサポートされていません。デフォルトで、唯一のオプションは Parquet です。
+ **テーブルの削除** - `DROP TABLE` コマンドは、次の例のようにテーブルを削除するために使用されます。

  ```
  DROP TABLE [IF EXISTS] table_name [PURGE];
  ```
+ **テーブルの読み込み** - 次の例のように、`SELECT` ステートメントを使用して Hive の Iceberg テーブルを読み込むことができます。サポートされている実行エンジンは MR と Tez です。

  ```
  SELECT * FROM table_name
  ```

  Hive の select 構文の詳細については、「[LanguageManual Select](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select)」を参照してください。Hive で Iceberg テーブルを使用する select ステートメントの詳細については、「[Apache Iceberg Select](https://iceberg.apache.org/docs/latest/hive/#select)」を参照してください。
+ **テーブルへの挿入** - HiveQL の `INSERT INTO` ステートメントは Map Reduce 実行エンジンのみをサポートする Iceberg テーブルで動作します。Amazon EMR Hive は実行時に Iceberg テーブルのエンジンを選択するため、Amazon EMR ユーザーは実行エンジンを明示的に設定する必要はありません。
  + **単一テーブルの insert into** - 例:

    ```
    INSERT INTO table_name VALUES ('a', 1);
    INSERT INTO table_name SELECT...;
    ```
  + **複数テーブルの insert into** - 非アトミック複数テーブル insert into ステートメントがサポートされています。例:

    ```
    FROM source
     INSERT INTO table_1 SELECT a, b
     INSERT INTO table_2 SELECT c,d;
    ```

Amazon EMR 7.3.0 以降、Hive with Iceberg は AWS Glue データカタログをメタストアとしてサポートしています。 AWS Glue データカタログをメタストアとして使用するには、次のプロパティを設定します。

```
SET iceberg.catalog.<catalog_name>.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog;
```

または、次のプロパティを設定することもできます。

```
SET iceberg.catalog.<catalog_name>.type=glue;
```

次に、以下の例を使用してテーブルを作成できます。

```
CREATE EXTERNAL TABLE table_name (col1 type1, col2 type2,..)
ROW FORMAT SERDE 'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location '<location>'
TBLPROPERTIES ('table_type'='iceberg', 'iceberg.catalog'='<catalog_name>');
```

## Hive で Iceberg を使用するための考慮事項
<a name="hive-considerations"></a>
+ Iceberg は以下のクエリタイプをサポートしています。
  + テーブルの作成
  + Drop table
  + Insert into table
  + Read table
+ DML (データ操作言語) 操作では MR (MapReduce) 実行エンジンのみがサポートされており、MR は Hive 3.1.3 で廃止されました。
+ 7.3.0 より前の Amazon EMR では、Hive を使用した Iceberg では AWS Glue Data Catalog は現在サポートされていません。
+ エラー処理の堅牢性が不十分です。設定に誤りがあっても、insert into クエリが正常に完了する可能性があります。ただし、メタデータの更新に失敗によりデータが失われる可能性があります。
+ Iceberg Glue 統合は、Redshift Managed Storage カタログでは機能しません。