

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Bekerja dengan tabel Iceberg dengan menggunakan Apache Spark
<a name="iceberg-spark"></a>

Bagian ini memberikan gambaran umum tentang penggunaan Apache Spark untuk berinteraksi dengan tabel Iceberg. Contohnya adalah kode boilerplate yang dapat berjalan di Amazon EMR atau. AWS Glue

Catatan: Antarmuka utama untuk berinteraksi dengan tabel Iceberg adalah SQL, sehingga sebagian besar contoh akan menggabungkan Spark SQL dengan API. DataFrames 

## Membuat dan menulis tabel Iceberg
<a name="spark-create-data"></a>

Anda dapat menggunakan Spark SQL dan Spark DataFrames untuk membuat dan menambahkan data ke tabel Iceberg.

### Menggunakan Spark SQL
<a name="spark-sql"></a>

Untuk menulis dataset Iceberg, gunakan pernyataan SQL Spark standar seperti dan. `CREATE TABLE` `INSERT INTO`

#### Tabel yang tidak dipartisi
<a name="spark-sql-unpartitioned"></a>

Berikut adalah contoh membuat tabel Iceberg yang tidak dipartisi dengan Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

Untuk menyisipkan data ke dalam tabel yang tidak dipartisi, gunakan pernyataan standar: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### Tabel yang dipartisi
<a name="spark-sql-partitioned"></a>

Berikut adalah contoh membuat tabel Iceberg yang dipartisi dengan Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

Untuk menyisipkan data ke dalam tabel Iceberg yang dipartisi dengan Spark SQL, gunakan pernyataan standar: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**catatan**  
Dimulai dengan Iceberg 1.5.0, mode distribusi `hash` tulis adalah default saat Anda memasukkan data ke dalam tabel yang dipartisi. Untuk informasi selengkapnya, lihat [Menulis Mode Distribusi](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes) dalam dokumentasi Gunung Es.

### Menggunakan DataFrames API
<a name="spark-dataframes"></a>

Untuk menulis kumpulan data Iceberg, Anda dapat menggunakan API. `DataFrameWriterV2`

Untuk membuat tabel Iceberg dan menulis data untuk itu, gunakan fungsi `df.writeTo(` t). Jika tabel ada, gunakan `.append()` fungsi. Jika tidak, gunakan Contoh `.create().` berikut digunakan`.createOrReplace()`, yang merupakan variasi `.create()` yang setara dengan`CREATE OR REPLACE TABLE AS SELECT`.

#### Tabel yang tidak dipartisi
<a name="spark-df-unpartitioned"></a>

Untuk membuat dan mengisi tabel Iceberg yang tidak dipartisi dengan menggunakan API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

Untuk menyisipkan data ke dalam tabel Iceberg tak terpartisi yang sudah ada dengan menggunakan API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### Tabel yang dipartisi
<a name="spark-df-partitioned"></a>

Untuk membuat dan mengisi tabel Iceberg yang dipartisi dengan menggunakan API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

Untuk menyisipkan data ke dalam tabel Iceberg yang dipartisi dengan menggunakan API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## Memperbarui data dalam tabel Iceberg
<a name="spark-update-data"></a>

Contoh berikut menunjukkan cara memperbarui data dalam tabel Iceberg. Contoh ini memodifikasi semua baris yang memiliki nomor genap di `c_customer_sk` kolom.

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

Operasi ini menggunakan copy-on-write strategi default, sehingga menulis ulang semua file data yang terkena dampak.

## Meningkatkan data dalam tabel Iceberg
<a name="spark-upsert-data"></a>

Upserting data mengacu pada memasukkan catatan data baru dan memperbarui catatan data yang ada dalam satu transaksi. Untuk meningkatkan data ke dalam tabel Iceberg, Anda menggunakan pernyataan tersebut. `SQL MERGE INTO` 

Contoh berikut meningkatkan isi tabel`{UPSERT_TABLE_NAME`\$1 di dalam tabel: `{TABLE_NAME}`

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ Jika catatan pelanggan yang `{UPSERT_TABLE_NAME}` sudah ada di dalam `{TABLE_NAME}` dengan yang sama`c_customer_id`, `c_email_address` nilai `{UPSERT_TABLE_NAME}` rekaman akan menggantikan nilai yang ada (operasi pembaruan).
+ Jika catatan pelanggan yang masuk `{UPSERT_TABLE_NAME}` tidak ada di`{TABLE_NAME}`, `{UPSERT_TABLE_NAME}` catatan ditambahkan ke `{TABLE_NAME}` (operasi sisipkan).

## Menghapus data dalam tabel Iceberg
<a name="spark-delete-data"></a>

Untuk menghapus data dari tabel Iceberg, gunakan `DELETE FROM` ekspresi dan tentukan filter yang cocok dengan baris yang akan dihapus.

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

Jika filter cocok dengan seluruh partisi, Iceberg melakukan penghapusan metadata saja dan membiarkan file data di tempatnya. Jika tidak, ia hanya menulis ulang file data yang terpengaruh.

Metode delete mengambil file data yang dipengaruhi oleh `WHERE` klausa dan membuat salinannya tanpa catatan yang dihapus. Kemudian membuat snapshot tabel baru yang menunjuk ke file data baru. Oleh karena itu, catatan yang dihapus masih ada di snapshot tabel yang lebih lama. Misalnya, jika Anda mengambil snapshot sebelumnya dari tabel, Anda akan melihat data yang baru saja Anda hapus. Untuk informasi tentang menghapus snapshot lama yang tidak dibutuhkan dengan file data terkait untuk tujuan pembersihan, lihat bagian [Memelihara file dengan menggunakan pemadatan](best-practices-compaction.md) nanti dalam panduan ini.

## Membaca data
<a name="spark-read-data"></a>

Anda dapat membaca status terbaru dari tabel Iceberg Anda di Spark dengan Spark SQL dan. DataFrames 

Contoh menggunakan Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

Contoh menggunakan DataFrames API:

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## Menggunakan perjalanan waktu
<a name="spark-time-travel"></a>

Setiap operasi tulis (menyisipkan, memperbarui, meningkatkan, menghapus) dalam tabel Iceberg membuat snapshot baru. Anda kemudian dapat menggunakan snapshot ini untuk perjalanan waktu — untuk kembali ke masa lalu dan memeriksa status tabel di masa lalu.

Untuk informasi tentang cara mengambil riwayat snapshot untuk tabel dengan menggunakan `snapshot-id` dan nilai waktu, lihat bagian [Mengakses metadata](#spark-metadata) nanti dalam panduan ini.

Kueri perjalanan waktu berikut menampilkan status tabel berdasarkan spesifik`snapshot-id`.

Menggunakan Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

Menggunakan DataFrames API:

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

Kueri perjalanan waktu berikut menampilkan status tabel berdasarkan snapshot terakhir yang dibuat sebelum stempel waktu tertentu, dalam milidetik (). `as-of-timestamp`

Menggunakan Spark SQL:

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

Menggunakan DataFrames API:

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## Menggunakan kueri inkremental
<a name="spark-incremental-queries"></a>

Anda juga dapat menggunakan snapshot Iceberg untuk membaca data yang ditambahkan secara bertahap. 

Catatan: Saat ini, operasi ini mendukung pembacaan data dari `append` snapshot. Itu tidak mendukung pengambilan data dari operasi seperti`replace`,`overwrite`, atau`delete`.  Selain itu, operasi baca tambahan tidak didukung dalam sintaks Spark SQL.

Contoh berikut mengambil semua catatan ditambahkan ke tabel Iceberg antara snapshot `start-snapshot-id` (eksklusif) dan (inklusif). `end-snapshot-id`

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## Mengakses metadata
<a name="spark-metadata"></a>

Iceberg menyediakan akses ke metadata-nya melalui SQL. Anda dapat mengakses metadata untuk setiap tabel yang diberikan (`<table_name>`) dengan menanyakan namespace. `<table_name>.<metadata_table>` Untuk daftar lengkap tabel metadata, lihat [Memeriksa tabel](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables) dalam dokumentasi Gunung Es.

Contoh berikut menunjukkan cara mengakses tabel metadata sejarah Gunung Es, yang menunjukkan riwayat komit (perubahan) untuk tabel Gunung Es. 

Menggunakan Spark SQL (dengan `%%sql` keajaiban) dari notebook Amazon EMR Studio:

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

Menggunakan DataFrames API:

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

Contoh output:

![\[Contoh keluaran metadata dari tabel Gunung Es\]](http://docs.aws.amazon.com/id_id/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
