

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Iceberg 集群与 Spark 结合使用
<a name="emr-iceberg-use-spark-cluster"></a>

从 Amazon EMR 版本 6.5.0 开始，您可以将 Iceberg 用于您的 Spark 集群，无需包含引导操作。对于 Amazon EMR 版本 6.4.0 及更早版本，您可以使用引导操作来预装所有需要的依赖项。

在本教程中，您将使用在 Amazon EMR Spark 集群上使用 Iceberg。 AWS CLI 要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Apache Iceberg 数据湖](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

## 创建 Iceberg 集群
<a name="emr-iceberg-create-cluster"></a>

您可以使用 AWS 管理控制台、 AWS CLI 或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中，您将使用在 AWS CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群，请按照[使用 Amazon Athena、Amazon EMR 和 AWS Glue 构建 Apache Iceberg 数据湖](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/)中的步骤操作。

要将 Amazon EMR 上的 Iceberg 与一起 AWS CLI使用，请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 AWS CLI，请参阅[创建集群 AWS CLI 时使用提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli)或[在创建集群时，使用 Java SDK 提供配置](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk)。

1. 创建 `configurations.json` 文件并输入以下内容：

   ```
   [{
       "Classification":"iceberg-defaults",
       "Properties":{"iceberg.enabled":"true"}
   }]
   ```

1. 接下来，使用以下配置创建集群。将实例 Amazon S3 桶路径和子网 ID 替换为您自己的值。

   ```
   aws emr create-cluster --release-label emr-6.5.0 \
   --applications Name=Spark \
   --configurations file://configurations.json \
   --region us-east-1 \
   --name My_Spark_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket/ \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

您还可以创建一个包含 Spark 应用程序的 Amazon EMR 集群，并且将文件 `/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` 作为 Spark 任务中的 JAR 依赖关系包含在内。有关更多信息，请参阅[提交应用程序](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)。

要将 jar 作为 Spark 作业中的依赖项包含在内，请将以下配置属性添加到 Spark 应用程序中：

```
--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
```

有关 Spark 作业依赖项的更多信息，请参阅 Apache Spark 文档 [Running Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html)（在 Kubernetes 上运行 Spark）中的 [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)（依赖项管理）。

## 为 Iceberg 初始化 Spark 会话
<a name="emr-iceberg-initialize-spark-session"></a>

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交，或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Iceberg。

------
#### [ spark-shell ]

1. 使用 SSH 连接主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳，请`spark-shell`替换为`pyspark`。

   ```
   spark-shell \
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket/prefix/
       --conf spark.sql.catalog.my_catalog.type=glue \
       --conf spark.sql.defaultCatalog=my_catalog \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ spark-submit ]

1. 使用 SSH 连接主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以为 Iceberg 启动 Spark 会话。

   ```
   spark-submit \
   --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
   --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
   --conf spark.sql.catalog.my_catalog.type=glue \
   --conf spark.sql.defaultCatalog=my_catalog \
   --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ EMR Studio notebooks ]

要使用 EMR Studio notebooks 初始化 Spark 会话，请使用 Amazon EMR notebook 中的 `%%configure` 魔法命令配置 Spark 会话，如以下示例所示。有关更多信息，请参阅 *Amazon EMR 管理指南*中的[使用 EMR Notebooks 魔法命令](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)。

```
%%configure -f{
"conf":{
    "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.my_catalog.type":"glue",
    "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
    "spark.sql.defaultCatalog":"my_catalog",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
}
```

------
#### [ CLI ]

要使用 CLI 初始化 Spark 集群并设置所有 Spark Iceberg 会话默认配置，请运行下面的示例。有关使用 AWS CLI 和 Amazon EMR API 指定配置分类的更多信息，请参阅[配置](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html)应用程序。

```
[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.my_catalog.type":"glue",
      "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
      "spark.sql.defaultCatalog":"my_catalog",
      "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
  }
]
```

------

## 写入 Iceberg 表
<a name="emr-iceberg-write-to-table"></a>

以下示例说明如何创建 DataFrame 并将其写为 Iceberg 数据集。这些示例演示使用 Spark Shell 处理数据集，同时使用 SSH 作为原定设置将 hadoop 用户连接到主节点（master node）。

**注意**  
要将代码示例粘贴到 Spark Shell 中，请在提示符处键入 `:paste`，粘贴示例，然后按 `CTRL+D`。

------
#### [ PySpark ]

Spark 包含一个基于 Python 的 Shell `pyspark`，您可以用它来设计以 Python 编写的 Spark 程序的原型。在主节点上调用 `pyspark`。

```
## Create a DataFrame.
data = spark.createDataFrame([
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
],["id", "creation_date", "last_update_time"])

## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._

// Create a DataFrame.
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")

// Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------

## 从 Iceberg 表读取
<a name="emr-iceberg-read-from-table"></a>

------
#### [ PySpark ]

```
df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Scala ]

```
val df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Spark SQL ]

```
SELECT * from dev.db.iceberg_table LIMIT 10
```

------

## 将 AWS Glue 数据目录与 Spark Iceberg 配合使用
<a name="emr-iceberg-glue-catalog-config-spark"></a>

你可以从 Spark Iceberg 连接到 AWS Glue 数据目录。本节介绍了不同的连接命令。

### 连接到默认区域中的默认 AWS Glue 目录
<a name="emr-iceberg-glue-catalog-config-spark"></a>

此示例展示了如何使用 Glue 目录类型进行连接。如果您未指定目录 ID，则使用默认值：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

### 使用特定目录 AWS ID 连接到 Glue 目录
<a name="emr-iceberg-glue-catalog-config-spark"></a>

此示例展示了如何使用目录 ID 进行连接：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.catalog.my_catalog.glue.id=AWS Glue catalog ID \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

此命令可用于连接到其他账户中的 AWS Glue 目录、RMS 目录或联合目录。

## 将 Iceberg REST Catalog (IRC) 与 Spark Iceberg 结合使用
<a name="emr-iceberg-rest-catalog-config"></a>

以下各节详细介绍了如何配置 Iceberg 与目录的集成。

### Connect 到 AWS Glue 数据目录 IRC 端点
<a name="emr-iceberg-rest-catalog-config-gdc"></a>

下面展示了使用 Iceberg REST 的示例 `spark-submit` 命令：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=glue catalog ID \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=glue endpoint URI/iceberg \
    --conf spark.sql.catalog.my_catalog.rest.sigv4-enabled=true \
    --conf spark.sql.catalog.my_catalog.rest.signing-name=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

要在启用运行时角色的集群上使用它，还需要以下额外的 Spark 配置设置：

```
"spark.hadoop.fs.s3.credentialsResolverClass": "software.amazon.glue.GlueTableCredentialsResolver",
"spark.hadoop.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.hadoop.glue.id": glue catalog ID
"spark.hadoop.glue.endpoint": "glue endpoint"
```

有关每个区域的 AWS Glue 端点网址列表，请参阅 [AWS Glue 端点和配额](https://docs.aws.amazon.com/general/latest/gr/glue.html)。

### 连接到任意 IRC 端点
<a name="emr-iceberg-rest-catalog-config-arbitrary"></a>

下面展示了使用 IRC 端点的示例 `spark-submit` 命令：

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=warehouse name \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=your rest endpoint \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

## 使用 Iceberg 和 Iceberg SparkCatalog 时的配置差异 SparkSessionCatalog
<a name="emr-iceberg-spark-catalog"></a>

Iceberg 提供了两种创建 Spark Iceberg 目录的方法。您可以将 Spark 配置设置为 `SparkCatalog` 或 `SparkSessionCatalog`。

### 使用冰山 SparkCatalog
<a name="emr-iceberg-spark-catalog-spark-catalog"></a>

以下显示了用**SparkCatalog**作 Spark Iceberg 目录的命令：

```
spark-shell \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.defaultCatalog=my_catalog
```

此方法的注意事项：
+ 您可以访问 Iceberg 表，但无法访问其他表。
+ 目录名称不能是 **spark\$1catalog**。这是 Spark 中初始目录的名称。它始终连接到 Hive 元存储。这是 Spark 中的默认目录，除非用户使用 `spark.sql.defaultCatalog` 覆盖它。
+ 您可以将 `spark.sql.defaultCatalog` 设置为您的目录名称，使其成为默认目录。

### 使用冰山 SparkSessionCatalog
<a name="emr-iceberg-spark-catalog-spark-session"></a>

以下显示了用**SparkSessionCatalog**作 Spark Iceberg 目录的命令：

```
spark-shell \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.spark_catalog.type=glue
```

此方法的注意事项：
+ 如果找不到作为 Iceberg 表的表，Spark 将尝试查看它是否是 Hive 元存储中的表。有关更多信息[，请参阅使用 AWS Glue 数据目录作为 Hive](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html) 的目录。
+ 目录名称必须是 **spark\$1catalog**。

## 使用 Iceberg Spark 扩展
<a name="emr-iceberg-spark-catalog-extensions"></a>

Iceberg 提供 Spark 扩展 `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`，用户可以通过 Spark 扩展配置 `spark.sql.extensions` 进行设置。这些扩展支持 Iceberg 的关键功能（例如行级 DELETE、UPDATE 和 MERGE）以及 Iceberg 特有的 Spark 数据定义语言语句和过程（例如压缩、快照过期、分支和标记等）。有关更多详细信息，请参阅以下内容：
+ Iceberg Spark 写入扩展：[Spark 写入](https://iceberg.apache.org/docs/nightly/spark-writes/)
+ Iceberg Spark DDL 扩展：[ALTER TABLE SQL 扩展](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)
+ Iceberg Spark 过程扩展：[Spark 过程](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)

## 将 Iceberg 与 Spark 结合使用的注意事项
<a name="spark-considerations-catalog"></a>
+ 原定设置下，Amazon EMR 6.5.0 不支持 Iceberg 在 Amazon EMR on EKS 上运行。Amazon EMR 6.5.0 自定义映像可供您传递 `--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` 作为 `spark-submit` 参数，用于在 Amazon EMR on EKS 上创建 Iceberg 表。有关更多信息，请参阅《Amazon EMR on EKS 开发指南》**中的[使用自定义映像在 Amazon EMR 中提交 Spark 工作负载](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-steps.html#docker-custom-images-submit)。您也可以联系 支持 获取帮助。从 Amazon EMR 6.6.0 开始，Amazon EMR on EKS 支持 Iceberg。
+ 使用 AWS Glue 作为 Iceberg 的目录时，请确保要在其中创建表的数据库存在于 Glue 中 AWS 。如果您正在使用诸如的服务 AWS Lake Formation ，但无法加载目录，请确保您拥有对服务的正确访问权限以执行命令。
+ 如果按照中所述使用 Iceberg SparkSessionCatalog，则除了[使用 Iceberg 和 Iceberg SparkCatalog 时的配置差异 SparkSessionCatalog](#emr-iceberg-spark-catalog)配置 Spark Iceberg [AWS Glue 数据目录设置外，还必须按照将 Glue 数据目录配置为 Apache Hive 元存储](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html)中描述的配置步骤进行操作。 AWS 