

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Lavorare con le tabelle Iceberg utilizzando Apache Spark
<a name="iceberg-spark"></a>

Questa sezione fornisce una panoramica sull'uso di Apache Spark per interagire con le tabelle Iceberg. Gli esempi sono codice standard che può essere eseguito su Amazon EMR o. AWS Glue

Nota: l'interfaccia principale per l'interazione con le tabelle Iceberg è SQL, quindi la maggior parte degli esempi combinerà Spark SQL con l'API. DataFrames 

## Creazione e scrittura di tabelle Iceberg
<a name="spark-create-data"></a>

Puoi usare Spark SQL e Spark DataFrames per creare e aggiungere dati alle tabelle Iceberg.

### Usare Spark SQL
<a name="spark-sql"></a>

Per scrivere un set di dati Iceberg, usa istruzioni SQL Spark standard come e. `CREATE TABLE` `INSERT INTO`

#### Tabelle non partizionate
<a name="spark-sql-unpartitioned"></a>

Ecco un esempio di creazione di una tabella Iceberg non partizionata con Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

Per inserire dati in una tabella non partizionata, usa un'istruzione standard: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### Tabelle partizionate
<a name="spark-sql-partitioned"></a>

Ecco un esempio di creazione di una tabella Iceberg partizionata con Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

Per inserire dati in una tabella Iceberg partizionata con Spark SQL, usa un'istruzione standard: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**Nota**  
A partire da Iceberg 1.5.0, la modalità di distribuzione di `hash` scrittura è l'impostazione predefinita quando si inseriscono dati in tabelle partizionate. Per ulteriori informazioni, consulta [Writing Distribution Modes](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes) nella documentazione di Iceberg.

### Utilizzo dell'API DataFrames
<a name="spark-dataframes"></a>

Per scrivere un set di dati Iceberg, puoi utilizzare l'`DataFrameWriterV2`API.

Per creare una tabella Iceberg e scrivere dati su di essa, usa la funzione `df.writeTo(` t). Se la tabella esiste, usa la `.append()` funzione. In caso contrario, usa `.create().` Gli esempi seguenti usano`.createOrReplace()`, che è una variante di `.create()` che è equivalente a`CREATE OR REPLACE TABLE AS SELECT`.

#### Tabelle non partizionate
<a name="spark-df-unpartitioned"></a>

Per creare e popolare una tabella Iceberg non partizionata utilizzando l'API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

Per inserire dati in una tabella Iceberg non partizionata esistente utilizzando l'API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### Tabelle partizionate
<a name="spark-df-partitioned"></a>

Per creare e popolare una tabella Iceberg partizionata utilizzando l'API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

Per inserire dati in una tabella Iceberg partizionata utilizzando l'API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## Aggiornamento dei dati nelle tabelle Iceberg
<a name="spark-update-data"></a>

L'esempio seguente mostra come aggiornare i dati in una tabella Iceberg. Questo esempio modifica tutte le righe che hanno un numero pari nella `c_customer_sk` colonna.

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

Questa operazione utilizza la copy-on-write strategia predefinita, quindi riscrive tutti i file di dati interessati.

## Sconvolgimento dei dati nelle tabelle Iceberg
<a name="spark-upsert-data"></a>

L'alterazione dei dati si riferisce all'inserimento di nuovi record di dati e all'aggiornamento dei record di dati esistenti in un'unica transazione. Per trasformare i dati in una tabella Iceberg, si utilizza l'istruzione. `SQL MERGE INTO` 

L'esempio seguente inverte il contenuto della tabella\$1 all'interno della tabella`{UPSERT_TABLE_NAME`: `{TABLE_NAME}`

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ Se un record cliente presente in esiste `{UPSERT_TABLE_NAME}` già in `{TABLE_NAME}` con lo stesso`c_customer_id`, il valore del `{UPSERT_TABLE_NAME}` record sostituisce il `c_email_address` valore esistente (operazione di aggiornamento).
+ Se un record del cliente presente in `{UPSERT_TABLE_NAME}` non esiste in`{TABLE_NAME}`, il `{UPSERT_TABLE_NAME}` record viene aggiunto a `{TABLE_NAME}` (operazione di inserimento).

## Eliminazione dei dati nelle tabelle Iceberg
<a name="spark-delete-data"></a>

Per eliminare i dati da una tabella Iceberg, utilizzate l'`DELETE FROM`espressione e specificate un filtro che corrisponda alle righe da eliminare.

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

Se il filtro corrisponde a un'intera partizione, Iceberg elimina solo i metadati e lascia i file di dati al loro posto. Altrimenti, riscrive solo i file di dati interessati.

Il metodo delete prende i file di dati interessati dalla `WHERE` clausola e ne crea una copia senza i record eliminati. Quindi crea una nuova istantanea della tabella che punta ai nuovi file di dati. Pertanto, i record eliminati sono ancora presenti nelle istantanee precedenti della tabella. Ad esempio, se recuperi l'istantanea precedente della tabella, vedrai i dati che hai appena eliminato. Per informazioni sulla rimozione di vecchie istantanee non necessarie con i relativi file di dati per scopi di pulizia, consulta la sezione [Manutenzione dei file utilizzando la compattazione](best-practices-compaction.md) più avanti in questa guida.

## Lettura dei dati
<a name="spark-read-data"></a>

Puoi leggere lo stato più recente delle tue tabelle Iceberg in Spark sia con Spark SQL che. DataFrames 

Esempio di utilizzo di Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

Esempio di utilizzo dell' DataFrames API:

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## Utilizzo del viaggio nel tempo
<a name="spark-time-travel"></a>

Ogni operazione di scrittura (inserimento, aggiornamento, annullamento, eliminazione) in una tabella Iceberg crea una nuova istantanea. È quindi possibile utilizzare queste istantanee per viaggiare nel tempo, per tornare indietro nel tempo e controllare lo stato di una tabella nel passato.

Per informazioni su come recuperare la cronologia delle istantanee per le tabelle utilizzando `snapshot-id` e cronometrando i valori, consultate la sezione [Accesso ai metadati](#spark-metadata) più avanti in questa guida.

La seguente interrogazione sui viaggi nel tempo mostra lo stato di una tabella in base a uno specifico. `snapshot-id`

Usando Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

Utilizzo dell' DataFrames API:

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

La seguente query sul viaggio nel tempo mostra lo stato di una tabella in base all'ultima istantanea creata prima di un timestamp specifico, in millisecondi (). `as-of-timestamp`

Usando Spark SQL:

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

Utilizzo dell' DataFrames API:

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## Utilizzo di query incrementali
<a name="spark-incremental-queries"></a>

È inoltre possibile utilizzare le istantanee Iceberg per leggere i dati aggiunti in modo incrementale. 

Nota: attualmente, questa operazione supporta la lettura di dati da istantanee. `append` Non supporta il recupero di dati da operazioni come `replace``overwrite`, o. `delete`  Inoltre, le operazioni di lettura incrementali non sono supportate nella sintassi SQL di Spark.

L'esempio seguente recupera tutti i record aggiunti a una tabella Iceberg compresi tra l'istantanea `start-snapshot-id` (esclusiva) e (inclusa). `end-snapshot-id`

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## Accesso ai metadati
<a name="spark-metadata"></a>

Iceberg fornisce l'accesso ai propri metadati tramite SQL. È possibile accedere ai metadati per ogni tabella (`<table_name>`) interrogando il namespace. `<table_name>.<metadata_table>` Per un elenco completo delle tabelle di metadati, consulta [Ispezione](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables) delle tabelle nella documentazione di Iceberg.

L'esempio seguente mostra come accedere alla tabella dei metadati della cronologia di Iceberg, che mostra la cronologia dei commit (modifiche) per una tabella Iceberg. 

Utilizzando Spark SQL (con la `%%sql` magia) da un notebook Amazon EMR Studio:

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

Utilizzo dell'API: DataFrames 

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

Output di esempio:

![\[Esempio di metadati in uscita da una tabella Iceberg\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
