

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Apache Spark を使用した Iceberg テーブルの操作
<a name="iceberg-spark"></a>

このセクションでは、Apache Spark を使用して Iceberg テーブルを操作する方法の概要を説明します。例は、Amazon EMR または で実行できる定型コードです AWS Glue。

注: Iceberg テーブルを操作するためのプライマリインターフェイスは SQL であるため、ほとんどの例では Spark SQL と DataFrames API が組み合わされます。

## Iceberg テーブルの作成と書き込み
<a name="spark-create-data"></a>

Spark SQL と Spark DataFrames を使用して、Iceberg テーブルにデータを作成して追加できます。

### Spark SQL の使用
<a name="spark-sql"></a>

Iceberg データセットを記述するには、 `CREATE TABLE`や などの標準の Spark SQL ステートメントを使用します`INSERT INTO`。

#### パーティション分割されていないテーブル
<a name="spark-sql-unpartitioned"></a>

Spark SQL を使用してパーティション分割されていない Iceberg テーブルを作成する例を次に示します。

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

パーティション化されていないテーブルにデータを挿入するには、標準`INSERT INTO`ステートメントを使用します。

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### パーティションテーブル
<a name="spark-sql-partitioned"></a>

Spark SQL を使用してパーティション化された Iceberg テーブルを作成する例を次に示します。

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

Spark SQL を使用してパーティション化された Iceberg テーブルにデータを挿入するには、標準`INSERT INTO`ステートメントを使用します。

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**注記**  
Iceberg 1.5.0 以降では、パーティションテーブルにデータを挿入する場合、`hash`書き込み分散モードがデフォルトになります。詳細については、Iceberg ドキュメントの「[Writing Distribution Modes](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes)」を参照してください。

### DataFrames API の使用
<a name="spark-dataframes"></a>

Iceberg データセットを記述するには、 `DataFrameWriterV2` API を使用できます。

Iceberg テーブルを作成し、そのテーブルにデータを書き込むには、`df.writeTo(`t) 関数を使用します。テーブルが存在する場合は、 `.append()`関数を使用します。そうでない場合は、`.create().`次の例で を使用します。これは `.createOrReplace()``.create()`に相当する のバリエーションです`CREATE OR REPLACE TABLE AS SELECT`。

#### パーティション分割されていないテーブル
<a name="spark-df-unpartitioned"></a>

`DataFrameWriterV2` API を使用してパーティション分割されていない Iceberg テーブルを作成して入力するには:

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

`DataFrameWriterV2` API を使用して 、パーティション分割されていない既存の Iceberg テーブルにデータを挿入するには:

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### パーティションテーブル
<a name="spark-df-partitioned"></a>

`DataFrameWriterV2` API を使用してパーティション化された Iceberg テーブルを作成して入力するには:

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

`DataFrameWriterV2` API を使用してパーティション化された Iceberg テーブルにデータを挿入するには:

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## Iceberg テーブルのデータの更新
<a name="spark-update-data"></a>

次の例は、Iceberg テーブルのデータを更新する方法を示しています。この例では、`c_customer_sk`列に偶数を持つすべての行を変更します。

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

このオペレーションはデフォルトのcopy-on-write戦略を使用するため、影響を受けるすべてのデータファイルを書き換えます。

## Iceberg テーブルのデータの更新
<a name="spark-upsert-data"></a>

データの更新とは、新しいデータレコードを挿入し、既存のデータレコードを 1 回のトランザクションで更新することです。Iceberg テーブルにデータをアップサートするには、 `SQL MERGE INTO`ステートメントを使用します。 

次の例では、テーブル 内のテーブル `{UPSERT_TABLE_NAME`\$1 の内容をアップサートします`{TABLE_NAME}`。

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ にある顧客レコードが同じ `{TABLE_NAME}`に`{UPSERT_TABLE_NAME}`既に存在する場合`c_customer_id`、`{UPSERT_TABLE_NAME}`レコード`c_email_address`値は既存の値を上書きします (更新オペレーション）。
+ にある顧客レコード`{UPSERT_TABLE_NAME}`が に存在しない場合`{TABLE_NAME}`、`{UPSERT_TABLE_NAME}`レコードは `{TABLE_NAME}` (オペレーションを挿入) に追加されます。

## Iceberg テーブルのデータの削除
<a name="spark-delete-data"></a>

Iceberg テーブルからデータを削除するには、 `DELETE FROM`式を使用して、削除する行に一致するフィルターを指定します。

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

フィルターがパーティション全体と一致する場合、Iceberg はメタデータのみの削除を実行し、データファイルを配置したままにします。それ以外の場合は、影響を受けるデータファイルのみを書き換えます。

delete メソッドは、 `WHERE`句の影響を受けるデータファイルを取得し、削除されたレコードなしでそれらのコピーを作成します。次に、新しいデータファイルを指す新しいテーブルスナップショットを作成します。したがって、削除されたレコードはテーブルの古いスナップショットにまだ存在します。たとえば、テーブルの前のスナップショットを取得すると、先ほど削除したデータが表示されます。クリーンアップの目的で関連データファイルを使用して不要な古いスナップショットを削除する方法については、このガイドの後半の[「圧縮を使用してファイルを維持する](best-practices-compaction.md)」セクションを参照してください。

## データの読み込み
<a name="spark-read-data"></a>

Spark SQL と DataFrames の両方で、Spark の Iceberg テーブルの最新ステータスを読み取ることができます。 

Spark SQL の使用例:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

DataFrames API の使用例:

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## タイムトラベルの使用
<a name="spark-time-travel"></a>

Iceberg テーブルの各書き込みオペレーション (挿入、更新、アップサート、削除) は、新しいスナップショットを作成します。その後、これらのスナップショットをタイムトラベルに使用できます。タイムトラベルをさかのぼって、過去のテーブルのステータスを確認できます。

`snapshot-id` およびタイミング値を使用してテーブルのスナップショットの履歴を取得する方法については、このガイドの後半にある[「メタデータへのアクセス](#spark-metadata)」セクションを参照してください。

次のタイムトラベルクエリは、特定の に基づいてテーブルのステータスを表示します `snapshot-id`。

Spark SQL の使用:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

DataFrames API の使用:

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

次のタイムトラベルクエリは、特定のタイムスタンプの前に作成された最後のスナップショットに基づいて、テーブルのステータスをミリ秒 () 単位で表示します`as-of-timestamp`。

Spark SQL の使用:

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

DataFrames API の使用:

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## 増分クエリの使用
<a name="spark-incremental-queries"></a>

Iceberg スナップショットを使用して、追加したデータを段階的に読み取ることもできます。 

注：  現在、このオペレーションは`append`スナップショットからのデータの読み取りをサポートしています。`replace`、、 `overwrite`などのオペレーションからのデータの取得はサポートされていません`delete`。  さらに、増分読み取りオペレーションは Spark SQL 構文ではサポートされていません。

次の の例では、スナップショット `start-snapshot-id` (排他的) と `end-snapshot-id` (包括的) の間に Iceberg テーブルに追加されたすべてのレコードを取得します。

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## メタデータへのアクセス
<a name="spark-metadata"></a>

Iceberg は SQL を介してメタデータへのアクセスを提供します。名前空間 をクエリすることで、任意のテーブル (`<table_name>`)  のメタデータにアクセスできます`<table_name>.<metadata_table>`。メタデータテーブルの完全なリストについては、Iceberg ドキュメントの[「テーブルの検査](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables)」を参照してください。

次の例は、Iceberg テーブルのコミット (変更) の履歴を示す Iceberg 履歴メタデータテーブルにアクセスする方法を示しています。 

Amazon EMR Studio ノートブックからの Spark SQL (`%%sql`マジックを使用) の使用:

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

DataFrames API の使用:

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

サンプル出力:

![\[Iceberg テーブルからのメタデータ出力例\]](http://docs.aws.amazon.com/ja_jp/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
