

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用带 Iceberg 的集群
<a name="emr-iceberg-use-cluster"></a>

本节包含将 Iceberg 与 Spark、Trino、Flink 和 Hive 结合使用的信息。

# 将 Iceberg 集群与 Spark 结合使用
<a name="emr-iceberg-use-spark-cluster"></a>

从 Amazon EMR 版本 6.5.0 开始，您可以将 Iceberg 用于您的 Spark 集群，无需包含引导操作。对于 Amazon EMR 版本 6.4.0 及更早版本，您可以使用引导操作来预装所有需要的依赖项。

在本教程中，您将使用在 Amazon EMR Spark 集群上使用 Iceberg。 AWS CLI 要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Apache Iceberg 数据湖](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

## 创建 Iceberg 集群
<a name="emr-iceberg-create-cluster"></a>

您可以使用 AWS 管理控制台、 AWS CLI 或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中，您将使用在 AWS CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Apache Iceberg 数据湖](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

要将 Amazon EMR 上的 Iceberg 与一起 AWS CLI使用，请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 AWS CLI，请参阅[创建集群 AWS CLI 时使用提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)或[在创建集群时，使用 Java SDK 提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。

1. 创建 `configurations.json` 文件并输入以下内容：

   ```
   [{
       "Classification":"iceberg-defaults",
       "Properties":{"iceberg.enabled":"true"}
   }]
   ```

1. 接下来，使用以下配置创建集群。将实例 Amazon S3 桶路径和子网 ID 替换为您自己的值。

   ```
   aws emr create-cluster --release-label emr-6.5.0 \
   --applications Name=Spark \
   --configurations file://configurations.json \
   --region us-east-1 \
   --name My_Spark_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket/ \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

您还可以创建一个包含 Spark 应用程序的 Amazon EMR 集群，并且将文件 `/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` 作为 Spark 任务中的 JAR 依赖关系包含在内。有关更多信息，请参阅[提交应用程序](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)。

要将 jar 作为 Spark 作业中的依赖项包含在内，请将以下配置属性添加到 Spark 应用程序中：

```
--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
```

有关 Spark 作业依赖项的更多信息，请参阅 Apache Spark 文档 [Running Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html)（在 Kubernetes 上运行 Spark）中的 [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)（依赖项管理）。

## 为 Iceberg 初始化 Spark 会话
<a name="emr-iceberg-initialize-spark-session"></a>

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交，或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Iceberg。

------
#### [ spark-shell ]

1. 使用 SSH 连接主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳，请`spark-shell`替换为`pyspark`。

   ```
   spark-shell \
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket/prefix/
       --conf spark.sql.catalog.my_catalog.type=glue \
       --conf spark.sql.defaultCatalog=my_catalog \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ spark-submit ]

1. 使用 SSH 连接主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以为 Iceberg 启动 Spark 会话。

   ```
   spark-submit \
   --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
   --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
   --conf spark.sql.catalog.my_catalog.type=glue \
   --conf spark.sql.defaultCatalog=my_catalog \
   --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ EMR Studio notebooks ]

要使用 EMR Studio notebooks 初始化 Spark 会话，请使用 Amazon EMR notebook 中的 `%%configure` 魔法命令配置 Spark 会话，如以下示例所示。有关更多信息，请参阅 *Amazon EMR 管理指南*中的[使用 EMR Notebooks 魔法命令](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)。

```
%%configure -f{
"conf":{
    "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.my_catalog.type":"glue",
    "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
    "spark.sql.defaultCatalog":"my_catalog",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
}
```

------
#### [ CLI ]

要使用 CLI 初始化 Spark 集群并设置所有 Spark Iceberg 会话默认配置，请运行下面的示例。有关使用 AWS CLI 和 Amazon EMR API 指定配置分类的更多信息，请参阅[配置](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html)应用程序。

```
[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.my_catalog.type":"glue",
      "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
      "spark.sql.defaultCatalog":"my_catalog",
      "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
  }
]
```

------

## 写入 Iceberg 表
<a name="emr-iceberg-write-to-table"></a>

以下示例说明如何创建 DataFrame 并将其写为 Iceberg 数据集。这些示例演示使用 Spark Shell 处理数据集，同时使用 SSH 作为原定设置将 hadoop 用户连接到主节点（master node）。

**注意**  
要将代码示例粘贴到 Spark Shell 中，请在提示符处键入 `:paste`，粘贴示例，然后按 `CTRL+D`。

------
#### [ PySpark ]

Spark 包含一个基于 Python 的 Shell `pyspark`，您可以用它来设计以 Python 编写的 Spark 程序的原型。在主节点上调用 `pyspark`。

```
## Create a DataFrame.
data = spark.createDataFrame([
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
],["id", "creation_date", "last_update_time"])

## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._

// Create a DataFrame.
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")

// Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------

## 从 Iceberg 表读取
<a name="emr-iceberg-read-from-table"></a>

------
#### [ PySpark ]

```
df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Scala ]

```
val df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Spark SQL ]

```
SELECT * from dev.db.iceberg_table LIMIT 10
```

------

## 将 AWS Glue 数据目录与 Spark Iceberg 配合使用
<a name="emr-iceberg-glue-catalog-config-spark"></a>

你可以从 Spark Iceberg 连接到 AWS Glue 数据目录。本节介绍了不同的连接命令。

### 连接到默认区域中的默认 AWS Glue 目录
<a name="emr-iceberg-glue-catalog-config-spark"></a>

此示例展示了如何使用 Glue 目录类型进行连接。如果您未指定目录 ID，则使用默认值：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

### 使用特定目录 AWS ID 连接到 Glue 目录
<a name="emr-iceberg-glue-catalog-config-spark"></a>

此示例展示了如何使用目录 ID 进行连接：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.catalog.my_catalog.glue.id=AWS Glue catalog ID \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

此命令可用于连接到其他账户中的 AWS Glue 目录、RMS 目录或联合目录。

## 将 Iceberg REST Catalog (IRC) 与 Spark Iceberg 结合使用
<a name="emr-iceberg-rest-catalog-config"></a>

以下各节详细介绍了如何配置 Iceberg 与目录的集成。

### Connect 到 AWS Glue 数据目录 IRC 端点
<a name="emr-iceberg-rest-catalog-config-gdc"></a>

下面展示了使用 Iceberg REST 的示例 `spark-submit` 命令：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=glue catalog ID \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=glue endpoint URI/iceberg \
    --conf spark.sql.catalog.my_catalog.rest.sigv4-enabled=true \
    --conf spark.sql.catalog.my_catalog.rest.signing-name=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

要在启用运行时角色的集群上使用它，还需要以下额外的 Spark 配置设置：

```
"spark.hadoop.fs.s3.credentialsResolverClass": "software.amazon.glue.GlueTableCredentialsResolver",
"spark.hadoop.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.hadoop.glue.id": glue catalog ID
"spark.hadoop.glue.endpoint": "glue endpoint"
```

有关每个区域的 AWS Glue 端点网址列表，请参阅 [AWS Glue 端点和配额](https://docs.aws.amazon.com/general/latest/gr/glue.html)。

### 连接到任意 IRC 端点
<a name="emr-iceberg-rest-catalog-config-arbitrary"></a>

下面展示了使用 IRC 端点的示例 `spark-submit` 命令：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=warehouse name \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=your rest endpoint \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

## 使用 Iceberg 和 Iceberg SparkCatalog 时的配置差异 SparkSessionCatalog
<a name="emr-iceberg-spark-catalog"></a>

Iceberg 提供了两种创建 Spark Iceberg 目录的方法。您可以将 Spark 配置设置为 `SparkCatalog` 或 `SparkSessionCatalog`。

### 使用冰山 SparkCatalog
<a name="emr-iceberg-spark-catalog-spark-catalog"></a>

以下显示了用**SparkCatalog**作 Spark Iceberg 目录的命令：

```
spark-shell \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.defaultCatalog=my_catalog
```

此方法的注意事项：
+ 您可以访问 Iceberg 表，但无法访问其他表。
+ 目录名称不能是 **spark\$1catalog**。这是 Spark 中初始目录的名称。它始终连接到 Hive 元存储。这是 Spark 中的默认目录，除非用户使用 `spark.sql.defaultCatalog` 覆盖它。
+ 您可以将 `spark.sql.defaultCatalog` 设置为您的目录名称，使其成为默认目录。

### 使用冰山 SparkSessionCatalog
<a name="emr-iceberg-spark-catalog-spark-session"></a>

以下显示了用**SparkSessionCatalog**作 Spark Iceberg 目录的命令：

```
spark-shell \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.spark_catalog.type=glue
```

此方法的注意事项：
+ 如果找不到作为 Iceberg 表的表，Spark 将尝试查看它是否是 Hive 元存储中的表。有关更多信息[，请参阅使用 AWS Glue 数据目录作为 Hive](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html) 的目录。
+ 目录名称必须是 **spark\$1catalog**。

## 使用 Iceberg Spark 扩展
<a name="emr-iceberg-spark-catalog-extensions"></a>

Iceberg 提供 Spark 扩展 `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`，用户可以通过 Spark 扩展配置 `spark.sql.extensions` 进行设置。这些扩展支持 Iceberg 的关键功能（例如行级 DELETE、UPDATE 和 MERGE）以及 Iceberg 特有的 Spark 数据定义语言语句和过程（例如压缩、快照过期、分支和标记等）。有关更多详细信息，请参阅以下内容：
+ Iceberg Spark 写入扩展：[Spark 写入](https://iceberg.apache.org/docs/nightly/spark-writes/)
+ Iceberg Spark DDL 扩展：[ALTER TABLE SQL 扩展](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)
+ Iceberg Spark 过程扩展：[Spark 过程](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)

## 将 Iceberg 与 Spark 结合使用的注意事项
<a name="spark-considerations-catalog"></a>
+ 原定设置下，Amazon EMR 6.5.0 不支持 Iceberg 在 Amazon EMR on EKS 上运行。Amazon EMR 6.5.0 自定义映像可供您传递 `--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` 作为 `spark-submit` 参数，用于在 Amazon EMR on EKS 上创建 Iceberg 表。有关更多信息，请参阅《Amazon EMR on EKS 开发指南》**中的[使用自定义映像在 Amazon EMR 中提交 Spark 工作负载](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-steps.html#docker-custom-images-submit)。您也可以联系 支持 获取帮助。从 Amazon EMR 6.6.0 开始，Amazon EMR on EKS 支持 Iceberg。
+ 使用 AWS Glue 作为 Iceberg 的目录时，请确保要在其中创建表的数据库存在于 Glue 中 AWS 。如果您正在使用诸如的服务 AWS Lake Formation ，但无法加载目录，请确保您拥有对服务的正确访问权限以执行命令。
+ 如果按照中所述使用 Iceberg SparkSessionCatalog，则除了[使用 Iceberg 和 Iceberg SparkCatalog 时的配置差异 SparkSessionCatalog](#emr-iceberg-spark-catalog)配置 Spark Iceberg [AWS Glue 数据目录设置外，还必须按照将 Glue 数据目录配置为 Apache Hive 元存储](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html)中描述的配置步骤进行操作。 AWS 

# 将 Iceberg 集群与 Trino 结合使用
<a name="emr-iceberg-use-trino-cluster"></a>

从 Amazon EMR 版本 6.6.0 开始，您可以将 Iceberg 用于您的 Trino 集群。

在本教程中，您将使用在 Amazon EMR Trino 集群上使用 Iceberg。 AWS CLI 要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Apache Iceberg 数据湖](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

## 创建 Iceberg 集群
<a name="emr-iceberg-create-cluster-trino"></a>

要将 Amazon EMR 上的 Iceberg 与一起 AWS CLI使用，请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 AWS CLI，请参阅[创建集群 AWS CLI 时使用提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)或[在创建集群时，使用 Java SDK 提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。

1. 创建包含以下内容的 `configurations.json` 文件。例如，假设您想将 Hive 元存储作为目录使用，则您的文件应包含以下内容。

   ```
   [
     {
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "hive.metastore.uri": "thrift://localhost:9083"
       }
     }
   ]
   ```

   如果您想使用 AWS Glue 数据目录作为存储，则您的文件应包含以下内容。

   ```
   [
     {
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "iceberg.catalog.type": "glue"
       }
     }
   ]
   ```

   从 Amazon EMR 7.7.0 开始，包括房产 *fs.native-s3.enabled=true*

   ```
   [
     { 
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "iceberg.catalog.type": "glue",
         "fs.native-s3.enabled": "true"
       }           
     }                 
   ]
   ```

1. 使用以下配置创建集群，将示例 Amazon S3 存储桶路径和件名称替换为您自己的值。

   ```
   aws emr create-cluster --release-label emr-6.7.0 \
   --applications Name=Trino \
   --region us-east-1 \
   --name My_Trino_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket \
   --configurations file://configurations.json \
   --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge InstanceGroupType=CORE,InstanceCount=3,InstanceType=c3.4xlarge \ 
   --use-default-roles \
   --ec2-attributes KeyName=<key-name>
   ```

## 为 Iceberg 初始化 Trino 会话
<a name="emr-iceberg-initialize-trino"></a>

要初始化 Trino 会话，请运行以下命令。

```
trino-cli --catalog iceberg
```

## 写入 Iceberg 表
<a name="emr-iceberg-write-to-table-trino"></a>

使用以下 SQL 命令创建并写入您的表。

```
trino> SHOW SCHEMAS;
trino> CREATE TABLE default.iceberg_table (
            id int,
            data varchar,
            category varchar)
       WITH (
            format = 'PARQUET',
            partitioning = ARRAY['category', 'bucket(id, 16)'],
            location = 's3://amzn-s3-demo-bucket/<prefix>')
          
trino> INSERT INTO default.iceberg_table VALUES (1,'a','c1'), (2,'b','c2'), (3,'c','c3');
```

## 从 Iceberg 表读取
<a name="emr-iceberg-read-from-table-trino"></a>

要从 Iceberg 表读取，请运行以下命令。

```
trino> SELECT * from default.iceberg_table;
```

## 将 Iceberg 与 Trino 结合使用的注意事项
<a name="trino-considerations"></a>
+ Amazon EMR 6.5 不提供对 Iceberg 的原生 Trino Iceberg Catalog 支持。Trino 需要使用 Iceberg v0.11，因此我们建议为 Trino 启动独立于 Spark 集群的 Amazon EMR 集群，并在该集群上包括 Iceberg v0.11。
+ 使用 AWS Glue 作为 Iceberg 的目录时，请确保要在其中创建表的数据库存在于 Glue 中 AWS 。如果您正在使用诸如的服务 AWS Lake Formation ，但无法加载目录，请确保您拥有对服务的正确访问权限以执行命令。
+ Iceberg Glue 集成不适用于 Redshift 托管存储目录。

# 将 Iceberg 集群与 Flink 结合使用
<a name="emr-iceberg-use-flink-cluster"></a>

从 Amazon EMR 版本 6.9.0 开始，您可以将 Iceberg 与 Flink 集群结合使用，而无需使用开源 Iceberg Flink 集成时所需的设置步骤。

## 创建 Iceberg 集群
<a name="creating-iceberg-cluster"></a>

您可以使用 AWS 管理控制台、 AWS CLI或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中，您将使用在 AWS CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Apache Iceberg 数据湖](https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

要将 Amazon EMR 上的 Iceberg 与一起 AWS CLI使用，请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 AWS CLI，请参阅[创建集群 AWS CLI 时使用提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)或[在创建集群时，使用 Java SDK 提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。使用以下内容创建名为 `configurations.json` 的文件：

```
[{
"Classification":"iceberg-defaults",
    "Properties":{"iceberg.enabled":"true"}
}]
```

接下来，使用以下配置创建集群，将示例 Amazon S3 桶路径和子网 ID 替换为您自己的值：

```
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
```

您还可以创建一个其中包含 Flink 应用程序的 Amazon EMR 6.9.0 集群，并且将文件 `/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar` 用作 Flink 作业中的 JAR 依赖项。

## 使用 Flink SQL 客户端
<a name="using-flink-sql-client"></a>

SQL 客户端脚本位于 `/usr/lib/flink/bin` 下。您可以使用以下命令运行脚本：

```
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
```

这将启动 Flink SQL Shell。

## Flink 示例
<a name="flink-examples"></a>

### 创建 Iceberg 表
<a name="create-iceberg-table"></a>

**Flink SQL**

```
CREATE CATALOG glue_catalog WITH (
   'type'='iceberg',
   'warehouse'='<WAREHOUSE>',
   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
 );

USE CATALOG  glue_catalog;

CREATE DATABASE IF NOT EXISTS <DB>;

USE <DB>;

CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
```

**表 API**

```
EnvironmentSettings settings =
                EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

String warehouse = "<WAREHOUSE>";
String db = "<DB>";

tEnv.executeSql(
                "CREATE CATALOG glue_catalog WITH (\n"
                        + "   'type'='iceberg',\n"
                        + "   'warehouse'='"
                        + warehouse
                        + "',\n"
                        + "   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
                        + "   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
                        + " );");

tEnv.executeSql("USE CATALOG  glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
        "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
```

### 写入 Iceberg 表
<a name="write-to-iceberg-table"></a>

**Flink SQL**

```
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
```

**表 API**

```
tEnv.executeSql(
        "INSERT INTO `glue_catalog`.`"
                + db
                + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
```

**数据流 API**

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String db = "<DB Name>";

String warehouse = "<Warehouse Path>";

GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));

DataStream<RowData> input = env.fromElements(rowData1);

Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");

CatalogLoader glueCatlogLoader =
        CatalogLoader.custom(
                "glue",
                props,
                new Configuration(),
                "org.apache.iceberg.aws.glue.GlueCatalog");

TableLoader tableLoader =
        TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));

DataStreamSink<Void> dataStreamSink =
        FlinkSink.forRowData(input).tableLoader(tableLoader).append();

env.execute("Datastream Write");
```

### 从 Iceberg 表读取
<a name="read-from-iceberg-table"></a>

**Flink SQL**

```
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
```

**表 API**

```
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
```

**数据流 API**

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String db = "<DB Name>";

String warehouse = "<Warehouse Path>";

Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");

CatalogLoader glueCatlogLoader =
        CatalogLoader.custom(
                "glue",
                props,
                new Configuration(),
                "org.apache.iceberg.aws.glue.GlueCatalog");
                
TableLoader tableLoader =
        TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));

DataStream<RowData> batch =
                FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();

batch.print().name("print-sink");
```

## 使用 Hive 目录
<a name="using-hive-catalog"></a>

确保如 [使用 Hive 元存储和 Glue 目录配置 Flink](flink-configure.md#flink-configure-hive) 中所述解析 Flink 和 Hive 依赖项。

## 运行 Flink 作业
<a name="running-flink-job"></a>

向 Flink 提交作业的一种方法是使用每个作业的 Flink YARN 会话。这可以通过以下命令启动：

```
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
```

## 将 Iceberg 与 Flink 结合使用的注意事项
<a name="flink-considerations"></a>
+ 使用 AWS Glue 作为 Iceberg 的目录时，请确保要在其中创建表的数据库存在于 Glue 中 AWS 。如果您正在使用诸如的服务 AWS Lake Formation ，但无法加载目录，请确保您拥有对服务的正确访问权限以执行命令。
+ Iceberg Glue 集成不适用于 Redshift 托管存储目录。

# 将 Iceberg 集群与 Hive 结合使用
<a name="emr-iceberg-use-hive-cluster"></a>

在 Amazon EMR 发行版 6.9.0 及更高版本中，您可以将 Iceberg 与 Hive 集群结合使用，而无需执行开源 Iceberg Hive 集成所需的设置步骤。对于 Amazon EMR 版本 6.8.0 及更早版本，您可以使用引导操作安装 `iceberg-hive-runtime` jar 来配置 Hive for Iceberg 支持。

Amazon EMR 6.9.0 包括 [Hive 3.1.3 与 Iceberg 0.14.1 集成](https://iceberg.apache.org/releases/#0140-release)的所有功能，还包括 Amazon EMR 增加的功能，例如在运行时自动选择支持的执行引擎（EKS 6.9.0 上的 Amazon EMR）。

## 创建 Iceberg 集群
<a name="create-iceberg-cluster"></a>

您可以使用 AWS 管理控制台、 AWS CLI 或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中，您将使用在 AWS CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Iceberg 数据湖](https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

要将 Amazon EMR 上的 Iceberg 与一起 AWS CLI使用，请先使用以下步骤创建一个集群。有关使用 AWS CLI 或 Java SDK 指定 Iceberg 分类的信息，请参阅[创建集群 AWS CLI 时使用提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)或[在创建集群时，使用 Java SDK 提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。使用以下内容创建名为 `configurations.json` 的文件：

```
[{
    "Classification":"iceberg-defaults",
    "Properties":{"iceberg.enabled":"true"}
}]
```

接下来，使用以下配置创建集群，将示例 Amazon S3 桶路径和子网 ID 替换为您自己的值：

```
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Hive \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_hive_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
```

Hive Iceberg 集群执行以下操作：
+ 在 Hive 中加载 Iceberg Hive 运行时 jar 并为 Hive 引擎启用 Iceberg 相关配置。
+ 启用 Amazon EMR Hive 的动态执行引擎选择，以防止用户设置支持的执行引擎实现 Iceberg 兼容性。

**注意**  
Hive Iceberg 集群目前不支持 Glue AWS 数据目录。默认 Iceberg 目录为 `HiveCatalog`，它对应于为 Hive 环境配置的元存储。有关目录管理的更多信息，请参阅 [Apache Hiv](https://cwiki.apache.org/confluence/display/HIVE) e 文档 HCatalog中的[使用](https://cwiki.apache.org/confluence/display/Hive/HCatalog+UsingHCat#HCatalogUsingHCat-UsingHCatalog)。

## 功能支持
<a name="feature-support"></a>

Amazon EMR 6.9.0 支持 Hive 3.1.3 和 Iceberg 0.14.1。该功能支持仅限于 Hive 3.1.2 和 3.1.3 的 Iceberg 兼容功能。支持以下命令：
+ 在 Amazon EMR 发行版 6.9.0 到 6.12.x 版本中，您必须将 `libfb303` jar 包含在 Hive `auxlib` 目录中。使用以下命令将其包含在内：

  ```
  sudo /usr/bin/ln -sf /usr/lib/hive/lib/libfb303-*.jar /usr/lib/hive/auxlib/libfb303.jar
  ```

  在 Amazon EMR 6.13 及更高版本中，`libfb303` jar 会自动符号链接到 Hive `auxlib` 目录。
+ **创建表**
  + **非分区表** - 可以通过提供存储处理程序在 Hive 中创建外部表，如下所示：

    ```
    CREATE EXTERNAL TABLE x (i int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    ```
  + **分区表** - 可以在 Hive 中创建外部分区表，如下所示：

    ```
    CREATE EXTERNAL TABLE x (i int) PARTITIONED BY (j int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    ```
**注意**  
Hive 3 ORC/AVRO/PARQUET 不支持`STORED AS`文件格式。默认且唯一的选项是 Parquet。
+ **删除表** - `DROP TABLE` 命令用于删除表，如以下示例中所示：

  ```
  DROP TABLE [IF EXISTS] table_name [PURGE];
  ```
+ **读取表** - `SELECT` 语句可用于读取 Hive 中的 Iceberg 表，如以下示例中所示。支持的执行引擎为 MR 和 Tez。

  ```
  SELECT * FROM table_name
  ```

  有关 Hive 的选择语法的信息，请参阅[LanguageManual 选择](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select)。有关在 Hive 中使用 Iceberg 表的选择语句的信息，请参阅 [Apache Iceberg Select](https://iceberg.apache.org/docs/latest/hive/#select)。
+ **插入到表中** - HiveQL 的 `INSERT INTO` 语句仅适用于支持 Map Reduce 执行引擎的 Iceberg 表。Amazon EMR 用户无需显式设置执行引擎，因为 Amazon EMR Hive 会在运行时为 Iceberg 表选择引擎。
  + **单表插入** - 例如：

    ```
    INSERT INTO table_name VALUES ('a', 1);
    INSERT INTO table_name SELECT...;
    ```
  + **多表插入** - 支持在语句中插入非原子多表。示例：

    ```
    FROM source
     INSERT INTO table_1 SELECT a, b
     INSERT INTO table_2 SELECT c,d;
    ```

从亚马逊 EMR 7.3.0 开始，Hive with Iceberg 支持 Glue 数据目录 AWS 作为元存储库。要使用 AWS Glue 数据目录作为元数据库，请设置以下属性。

```
SET iceberg.catalog.<catalog_name>.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog;
```

或者，您也可以设置以下属性。

```
SET iceberg.catalog.<catalog_name>.type=glue;
```

您可以使用以下示例创建表。

```
CREATE EXTERNAL TABLE table_name (col1 type1, col2 type2,..)
ROW FORMAT SERDE 'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location '<location>'
TBLPROPERTIES ('table_type'='iceberg', 'iceberg.catalog'='<catalog_name>');
```

## 将 Iceberg 与 Hive 结合使用的注意事项
<a name="hive-considerations"></a>
+ Iceberg 支持以下查询类型：
  + 创建表
  + 删除表
  + 插入到表中
  + 读取表
+ DML（数据操作语言MapReduce）操作仅支持 MR () 执行引擎，Hive 3.1.3 中已弃用 MR。
+ 对于 7.3.0 之前的亚马逊 EMR，带有 Hive 的 Iceberg 目前不支持 AWS Glue 数据目录。
+ 错误处理不够强大。在配置错误的情况下，插入查询可能会成功完成。但是，无法更新元数据可能会导致数据丢失。
+ Iceberg Glue 集成不适用于 Redshift 托管存储目录。