

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Arbeiten mit Iceberg-Tabellen mithilfe von Apache Spark
<a name="iceberg-spark"></a>

Dieser Abschnitt bietet einen Überblick über die Verwendung von Apache Spark für die Interaktion mit Iceberg-Tabellen. Bei den Beispielen handelt es sich um Standardcode, der auf Amazon EMR oder ausgeführt werden kann. AWS Glue

Hinweis: Die primäre Schnittstelle für die Interaktion mit Iceberg-Tabellen ist SQL, daher kombinieren die meisten Beispiele Spark SQL mit der API. DataFrames 

## Iceberg-Tabellen erstellen und schreiben
<a name="spark-create-data"></a>

Sie können Spark SQL und Spark verwenden, um Daten DataFrames zu Iceberg-Tabellen zu erstellen und hinzuzufügen.

### Verwenden von Spark SQL
<a name="spark-sql"></a>

Verwenden Sie standardmäßige Spark-SQL-Anweisungen wie `CREATE TABLE` und`INSERT INTO`, um einen Iceberg-Datensatz zu schreiben.

#### Unpartitionierte Tabellen
<a name="spark-sql-unpartitioned"></a>

Hier ist ein Beispiel für die Erstellung einer unpartitionierten Iceberg-Tabelle mit Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

Verwenden Sie eine Standardanweisung, um Daten in eine unpartitionierte Tabelle einzufügen: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### Partitionierte Tabellen
<a name="spark-sql-partitioned"></a>

Hier ist ein Beispiel für die Erstellung einer partitionierten Iceberg-Tabelle mit Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

Verwenden Sie eine Standardanweisung, um Daten mit Spark SQL in eine partitionierte Iceberg-Tabelle einzufügen: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**Anmerkung**  
Ab Iceberg 1.5.0 ist der `hash` Schreibverteilungsmodus der Standard, wenn Sie Daten in partitionierte Tabellen einfügen. Weitere Informationen finden Sie unter [Verteilungsmodi schreiben](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes) in der Iceberg-Dokumentation.

### Verwenden der API DataFrames
<a name="spark-dataframes"></a>

Um einen Iceberg-Datensatz zu schreiben, können Sie die `DataFrameWriterV2` API verwenden.

Verwenden Sie die Funktion `df.writeTo(` t), um eine Iceberg-Tabelle zu erstellen und Daten in diese zu schreiben. Wenn die Tabelle existiert, verwenden Sie die `.append()` Funktion. Ist dies nicht `.create().` der Fall`.createOrReplace()`, verwenden Sie die folgenden Beispiele use, was einer `.create()` Variante entspricht`CREATE OR REPLACE TABLE AS SELECT`.

#### Unpartitionierte Tabellen
<a name="spark-df-unpartitioned"></a>

So erstellen und füllen Sie eine unpartitionierte Iceberg-Tabelle mithilfe der API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

So fügen Sie mithilfe der API Daten in eine bestehende unpartitionierte Iceberg-Tabelle ein: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### Partitionierte Tabellen
<a name="spark-df-partitioned"></a>

Um eine partitionierte Iceberg-Tabelle mithilfe der API zu erstellen und aufzufüllen: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

So fügen Sie mithilfe der API Daten in eine partitionierte Iceberg-Tabelle ein: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## Daten in Iceberg-Tabellen aktualisieren
<a name="spark-update-data"></a>

Das folgende Beispiel zeigt, wie Daten in einer Iceberg-Tabelle aktualisiert werden. In diesem Beispiel werden alle Zeilen geändert, die eine gerade Zahl in der `c_customer_sk` Spalte haben.

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

Dieser Vorgang verwendet die copy-on-write Standardstrategie, sodass alle betroffenen Datendateien neu geschrieben werden.

## Daten in Iceberg-Tabellen werden aktualisiert
<a name="spark-upsert-data"></a>

Daten aktualisieren bezieht sich auf das Einfügen neuer Datensätze und das Aktualisieren vorhandener Datensätze in einer einzigen Transaktion. Um Daten in eine Iceberg-Tabelle hochzuladen, verwenden Sie die Anweisung. `SQL MERGE INTO` 

Im folgenden Beispiel wird der Inhalt der Tabelle`{UPSERT_TABLE_NAME`\$1 innerhalb der Tabelle verschoben: `{TABLE_NAME}`

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ Wenn ein Kundendatensatz, der sich in `{UPSERT_TABLE_NAME}` befindet, bereits `{TABLE_NAME}` mit demselben vorhanden ist`c_customer_id`, überschreibt der `{UPSERT_TABLE_NAME}` `c_email_address` Datensatzwert den vorhandenen Wert (Aktualisierungsvorgang).
+ Wenn ein Kundendatensatz, der `{UPSERT_TABLE_NAME}` sich in befindet, nicht existiert`{TABLE_NAME}`, wird der `{UPSERT_TABLE_NAME}` Datensatz hinzugefügt `{TABLE_NAME}` (Vorgang einfügen).

## Löschen von Daten in Iceberg-Tabellen
<a name="spark-delete-data"></a>

Um Daten aus einer Iceberg-Tabelle zu löschen, verwenden Sie den `DELETE FROM` Ausdruck und geben Sie einen Filter an, der den zu löschenden Zeilen entspricht.

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

Wenn der Filter auf eine gesamte Partition zutrifft, führt Iceberg eine reine Metadaten-Löschung durch und belässt die Datendateien an Ort und Stelle. Andernfalls werden nur die betroffenen Datendateien neu geschrieben.

Die Methode delete verwendet die Datendateien, die von der `WHERE` Klausel betroffen sind, und erstellt eine Kopie davon ohne die gelöschten Datensätze. Anschließend wird ein neuer Tabellen-Snapshot erstellt, der auf die neuen Datendateien verweist. Daher sind die gelöschten Datensätze immer noch in den älteren Snapshots der Tabelle vorhanden. Wenn Sie beispielsweise den vorherigen Snapshot der Tabelle abrufen, werden Ihnen die Daten angezeigt, die Sie gerade gelöscht haben. Informationen zum Entfernen nicht benötigter alter Snapshots mit den zugehörigen Datendateien zu Säuberungszwecken finden Sie im Abschnitt [Dateien mithilfe der Komprimierung verwalten weiter unten in diesem Handbuch](best-practices-compaction.md).

## Lesen von Daten
<a name="spark-read-data"></a>

Sie können den aktuellen Status Ihrer Iceberg-Tabellen in Spark sowohl mit Spark SQL als auch lesen. DataFrames 

Beispiel mit Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

Beispiel mit der DataFrames API:

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## Zeitreisen nutzen
<a name="spark-time-travel"></a>

Jeder Schreibvorgang (Einfügen, Aktualisieren, Hochsetzen, Löschen) in einer Iceberg-Tabelle erstellt einen neuen Snapshot. Sie können diese Snapshots dann für Zeitreisen verwenden, um in die Vergangenheit zu reisen und den Status einer Tabelle in der Vergangenheit zu überprüfen.

Informationen zum Abrufen des Verlaufs von Snapshots für Tabellen mithilfe von Werten `snapshot-id` und Zeitangaben finden Sie im Abschnitt [Zugreifen auf Metadaten](#spark-metadata) weiter unten in diesem Handbuch.

Die folgende Zeitreiseabfrage zeigt den Status einer Tabelle auf der Grundlage eines bestimmten `snapshot-id` Status an.

Verwenden von Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

Mithilfe der DataFrames API:

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

Die folgende Zeitreiseabfrage zeigt den Status einer Tabelle auf der Grundlage des letzten Snapshots, der vor einem bestimmten Zeitstempel erstellt wurde, in Millisekunden () an. `as-of-timestamp`

Verwenden von Spark SQL:

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

Mithilfe der DataFrames API:

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## Verwendung inkrementeller Abfragen
<a name="spark-incremental-queries"></a>

Sie können Iceberg-Snapshots auch verwenden, um angehängte Daten inkrementell zu lesen. 

Hinweis: Derzeit unterstützt dieser Vorgang das Lesen von Daten aus Snapshots. `append` Das Abrufen von Daten aus Operationen wie `replace``overwrite`, oder wird nicht unterstützt. `delete`  Darüber hinaus werden inkrementelle Lesevorgänge in der Spark-SQL-Syntax nicht unterstützt.

Im folgenden Beispiel werden alle Datensätze abgerufen, die an eine Iceberg-Tabelle zwischen dem Snapshot `start-snapshot-id` (exklusiv) und `end-snapshot-id` (inklusive) angehängt wurden.

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## Zugreifen auf Metadaten
<a name="spark-metadata"></a>

Iceberg bietet Zugriff auf seine Metadaten über SQL. Sie können auf die Metadaten für jede beliebige Tabelle (`<table_name>`) zugreifen, indem Sie den Namespace abfragen. `<table_name>.<metadata_table>` Eine vollständige Liste der Metadatentabellen finden Sie in der [Iceberg-Dokumentation unter Tabellen](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables) überprüfen.

Das folgende Beispiel zeigt, wie Sie auf die Iceberg-Historien-Metadatentabelle zugreifen können, in der der Verlauf der Commits (Änderungen) für eine Iceberg-Tabelle angezeigt wird. 

Verwenden von Spark SQL (mit der `%%sql` Magie) von einem Amazon EMR Studio-Notizbuch aus:

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

Mithilfe der DataFrames API:

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

Beispielausgabe:

![\[Beispiel für die Ausgabe von Metadaten aus einer Iceberg-Tabelle\]](http://docs.aws.amazon.com/de_de/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
