

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Apache Spark 处理冰山表
<a name="iceberg-spark"></a>

本节概述了如何使用 Apache Spark 与 Iceberg 表进行交互。示例是可以在 Amazon EMR 上运行的样板代码，或者。 AWS Glue

注意：与 Iceberg 表交互的主要接口是 SQL，因此大多数示例都将 Spark SQL 与 DataFrames API 结合使用。

## 创建和写作 Iceberg 表
<a name="spark-create-data"></a>

你可以使用 Spark SQL 和 Spark DataFrames 来创建数据并将其添加到 Iceberg 表中。

### 使用 Spark SQL
<a name="spark-sql"></a>

要编写 Iceberg 数据集，请使用标准 Spark SQL 语句，例如`CREATE TABLE`和。`INSERT INTO`

#### 未分区的表
<a name="spark-sql-unpartitioned"></a>

以下是使用 Spark SQL 创建未分区的 Iceberg 表的示例：

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

要向未分区的表中插入数据，请使用标准`INSERT INTO`语句：

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### 分区表
<a name="spark-sql-partitioned"></a>

以下是使用 Spark SQL 创建分区 Iceberg 表的示例：

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

要使用 Spark SQL 将数据插入分区 Iceberg 表，请使用标准`INSERT INTO`语句：

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**注意**  
从 Iceberg 1.5.0 开始，向分区表中插入数据时，`hash`写入分配模式为默认模式。有关更多信息，请参阅 Iceberg 文档中的[编写分发模式](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes)。

### 使用 DataFrames API
<a name="spark-dataframes"></a>

要编写 Iceberg 数据集，你可以使用 `DataFrameWriterV2` API。

要创建 Iceberg 表并向其写入数据，请使用 `df.writeTo(` t) 函数。如果该表存在，则使用`.append()`函数。如果不是，请使用`.create().`以下示例使用`.createOrReplace()`，其变体`.create()`等效于`CREATE OR REPLACE TABLE AS SELECT`。

#### 未分区的表
<a name="spark-df-unpartitioned"></a>

要使用 API 创建和填充未分区的 Iceberg 表，请执行以下操作：`DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

要使用 API 将数据插入现有的未分区 Iceberg 表，请执行以下操作：`DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### 分区表
<a name="spark-df-partitioned"></a>

要使用 API 创建和填充分区 Iceberg 表，请执行以下操作：`DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

要使用 API 将数据插入分区的 Iceberg 表，请执行`DataFrameWriterV2`以下操作：

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## 更新 Iceberg 表中的数据
<a name="spark-update-data"></a>

以下示例说明如何更新 Iceberg 表中的数据。此示例修改`c_customer_sk`列中包含偶数的所有行。

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

此操作使用默认 copy-on-write策略，因此它会重写所有受影响的数据文件。

## 更新 Iceberg 表中的数据
<a name="spark-upsert-data"></a>

更新数据是指在单个事务中插入新的数据记录并更新现有的数据记录。要将数据插入到 Iceberg 表中，请使用语句。`SQL MERGE INTO` 

以下示例将表格`{UPSERT_TABLE_NAME`} 的内容放入表中：`{TABLE_NAME}`

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ 如果中`{UPSERT_TABLE_NAME}`已经存在`{TABLE_NAME}`具有相同值的客户记录`c_customer_id`，则该`{UPSERT_TABLE_NAME}`记录`c_email_address`值将覆盖现有值（更新操作）。
+ 如果中的客户记录在中`{UPSERT_TABLE_NAME}`不存在`{TABLE_NAME}`，则该`{UPSERT_TABLE_NAME}`记录将添加到`{TABLE_NAME}`（插入操作）。

## 删除 Iceberg 表中的数据
<a name="spark-delete-data"></a>

要从 Iceberg 表中删除数据，请使用`DELETE FROM`表达式并指定与要删除的行匹配的筛选器。

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

如果过滤器匹配整个分区，Iceberg 会执行仅限元数据的删除并将数据文件保留在原处。否则，它只会重写受影响的数据文件。

delete 方法获取受`WHERE`子句影响的数据文件，并在不包含已删除记录的情况下创建这些文件的副本。然后，它会创建一个指向新数据文件的新表快照。因此，已删除的记录仍存在于表的旧快照中。例如，如果您检索表的上一个快照，则会看到刚刚删除的数据。有关出于清理目的删除不必要的旧快照以及相关数据文件的信息，请参阅本指南后面[的 “使用压缩维护文件](best-practices-compaction.md)” 一节。

## 读取数据
<a name="spark-read-data"></a>

你可以使用 Spark SQL 和 Spark 在 Spark 中读取 Iceberg 表的最新状态。 DataFrames 

使用 Spark SQL 的示例：

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

使用 DataFrames API 的示例：

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## 使用时空旅行
<a name="spark-time-travel"></a>

Iceberg 表中的每个写入操作（插入、更新、更新插入、删除）都会创建一个新快照。然后，您可以使用这些快照进行时空旅行，以回到过去，检查表的过去状态。

有关如何使用`snapshot-id`和计时值检索表快照历史记录的信息，请参阅本指南后面的[访问元数据](#spark-metadata)部分。

以下时空旅行查询根据特定内容显示表的状态`snapshot-id`。

使用 Spark SQL：

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

使用 DataFrames API：

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

以下时空旅行查询根据在特定时间戳之前创建的最后一次快照显示表的状态，以毫秒 () 为单位。`as-of-timestamp`

使用 Spark SQL：

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

使用 DataFrames API：

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## 使用增量查询
<a name="spark-incremental-queries"></a>

您还可以使用 Iceberg 快照以增量方式读取附加的数据。 

注意：目前，此操作支持从`append`快照中读取数据。它不支持从`replace``overwrite`、或`delete`等操作中获取数据。  此外，Spark SQL 语法不支持增量读取操作。

以下示例检索在快照`start-snapshot-id`（不包括）和`end-snapshot-id`（含）之间附加到 Iceberg 表的所有记录。

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## 访问元数据
<a name="spark-metadata"></a>

Iceberg 通过 SQL 提供对其元数据的访问。您可以通过查询命名空间来访问任何给定表 (`<table_name>`) 的元数据`<table_name>.<metadata_table>`。 有关元数据表的完整列表，请参阅 Iceberg 文档中的[检查表](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables)。

以下示例说明如何访问 Iceberg 历史元数据表，该表显示了 Iceberg 表的提交（更改）历史记录。 

使用亚马逊 EMR Studio 笔记本上的 Spark SQL（有`%%sql`魔法）：

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

使用 DataFrames API：

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

示例输出：

![冰山表的元数据输出示例](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
