

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Trabajar con tablas Iceberg mediante Apache Spark
<a name="iceberg-spark"></a>

En esta sección se proporciona una descripción general del uso de Apache Spark para interactuar con las tablas Iceberg. Los ejemplos son código repetitivo que se puede ejecutar en Amazon EMR o. AWS Glue

Nota: La interfaz principal para interactuar con las tablas de Iceberg es SQL, por lo que la mayoría de los ejemplos combinarán Spark SQL con la API. DataFrames 

## Crear y escribir tablas Iceberg
<a name="spark-create-data"></a>

Puedes usar Spark SQL y Spark DataFrames para crear y añadir datos a las tablas de Iceberg.

### Uso de Spark SQL
<a name="spark-sql"></a>

Para escribir un conjunto de datos de Iceberg, usa sentencias SQL estándar de Spark, como `CREATE TABLE` y`INSERT INTO`.

#### Tablas sin particionar
<a name="spark-sql-unpartitioned"></a>

Este es un ejemplo de cómo crear una tabla Iceberg sin particiones con Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

Para insertar datos en una tabla sin particiones, usa una declaración estándar: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### Tablas particionadas
<a name="spark-sql-partitioned"></a>

Este es un ejemplo de cómo crear una tabla Iceberg particionada con Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

Para insertar datos en una tabla Iceberg particionada con Spark SQL, usa una declaración estándar: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**nota**  
A partir de Iceberg 1.5.0, el modo de distribución de `hash` escritura es el predeterminado al insertar datos en tablas particionadas. Para obtener más información, consulta Cómo [escribir modos de distribución](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes) en la documentación de Iceberg.

### Uso de la API DataFrames
<a name="spark-dataframes"></a>

Para escribir un conjunto de datos de Iceberg, puedes usar la `DataFrameWriterV2` API.

Para crear una tabla de iceberg y escribir datos en ella, usa la función `df.writeTo(` t). Si la tabla existe, utilice la `.append()` función. Si no es así, usa `.create().` Los siguientes ejemplos usan`.createOrReplace()`, que es una variación de lo `.create()` que equivale a`CREATE OR REPLACE TABLE AS SELECT`.

#### Tablas sin particionar
<a name="spark-df-unpartitioned"></a>

Para crear y rellenar una tabla Iceberg sin particiones mediante la API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

Para insertar datos en una tabla de Iceberg sin particiones existente mediante la API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### Tablas particionadas
<a name="spark-df-partitioned"></a>

Para crear y rellenar una tabla de Iceberg particionada mediante la API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

Para insertar datos en una tabla de Iceberg particionada mediante la API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## Actualización de datos en tablas de iceberg
<a name="spark-update-data"></a>

El siguiente ejemplo muestra cómo actualizar los datos de una tabla de iceberg. En este ejemplo se modifican todas las filas que tienen un número par en la `c_customer_sk` columna.

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

Esta operación utiliza la copy-on-write estrategia predeterminada, por lo que reescribe todos los archivos de datos afectados.

## Modificación de datos en tablas de Iceberg
<a name="spark-upsert-data"></a>

La alteración de los datos consiste en insertar nuevos registros de datos y actualizar los registros de datos existentes en una sola transacción. Para descomponer los datos en una tabla de iceberg, se utiliza la declaración. `SQL MERGE INTO` 

El siguiente ejemplo altera el contenido de la tabla\$1 dentro de la tabla`{UPSERT_TABLE_NAME`: `{TABLE_NAME}`

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ Si un registro de cliente que está `{UPSERT_TABLE_NAME}` ya existe `{TABLE_NAME}` con el mismo`c_customer_id`, el valor del `{UPSERT_TABLE_NAME}` registro anula el `c_email_address` valor existente (operación de actualización).
+ Si un registro de cliente incluido `{UPSERT_TABLE_NAME}` no existe en`{TABLE_NAME}`, el `{UPSERT_TABLE_NAME}` registro se agrega a `{TABLE_NAME}` (operación de inserción).

## Eliminar datos de las tablas de Iceberg
<a name="spark-delete-data"></a>

Para eliminar datos de una tabla de iceberg, utilice la `DELETE FROM` expresión y especifique un filtro que coincida con las filas que desee eliminar.

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

Si el filtro coincide con una partición completa, Iceberg elimina solo los metadatos y deja los archivos de datos en su lugar. De lo contrario, solo reescribe los archivos de datos afectados.

El método de eliminación toma los archivos de datos afectados por la `WHERE` cláusula y crea una copia de los mismos sin los registros eliminados. A continuación, crea una nueva instantánea de la tabla que apunta a los nuevos archivos de datos. Por lo tanto, los registros eliminados siguen presentes en las instantáneas anteriores de la tabla. Por ejemplo, si recupera la instantánea anterior de la tabla, verá los datos que acaba de eliminar. Para obtener información sobre cómo eliminar instantáneas antiguas innecesarias con los archivos de datos relacionados con fines de limpieza, consulte la sección [Mantenimiento de archivos mediante la compactación](best-practices-compaction.md), que aparece más adelante en esta guía.

## Lectura de datos
<a name="spark-read-data"></a>

Puedes leer el estado más reciente de tus tablas de Iceberg en Spark tanto con Spark SQL como con. DataFrames 

Ejemplo de uso de Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

Ejemplo de uso de la DataFrames API:

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## Uso del viaje en el tiempo
<a name="spark-time-travel"></a>

Cada operación de escritura (insertar, actualizar, modificar, eliminar) en una tabla de Iceberg crea una nueva instantánea. A continuación, puede utilizar estas instantáneas para viajar en el tiempo, es decir, para retroceder en el tiempo y comprobar el estado de una tabla en el pasado.

Para obtener información sobre cómo recuperar el historial de las instantáneas de las tablas mediante valores de uso `snapshot-id` y temporización, consulte la sección [Acceso a los metadatos](#spark-metadata) que aparece más adelante en esta guía.

La siguiente consulta de viaje en el tiempo muestra el estado de una tabla en función de un dato específico`snapshot-id`.

Con Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

Uso de la DataFrames API:

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

La siguiente consulta de viaje en el tiempo muestra el estado de una tabla en función de la última instantánea que se creó antes de una marca de tiempo específica, en milisegundos ()`as-of-timestamp`.

Con Spark SQL:

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

Uso de la API DataFrames :

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## Uso de consultas incrementales
<a name="spark-incremental-queries"></a>

También puede utilizar las instantáneas de Iceberg para leer los datos adjuntos de forma incremental. 

Nota: Actualmente, esta operación admite la lectura de datos de instantáneas. `append` No admite la obtención de datos de operaciones como `replace``overwrite`, o. `delete`  Además, las operaciones de lectura incremental no se admiten en la sintaxis SQL de Spark.

En el siguiente ejemplo, se recuperan todos los registros adjuntos a una tabla de Iceberg entre la instantánea `start-snapshot-id` (exclusiva) y la `end-snapshot-id` (incluida).

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## Acceder a los metadatos
<a name="spark-metadata"></a>

Iceberg proporciona acceso a sus metadatos a través de SQL. Puede acceder a los metadatos de cualquier tabla (`<table_name>`) consultando el espacio de nombres. `<table_name>.<metadata_table>` Para obtener una lista completa de las tablas de metadatos, consulte [Inspección de tablas](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables) en la documentación de Iceberg.

El siguiente ejemplo muestra cómo acceder a la tabla de metadatos del historial de Iceberg, que muestra el historial de confirmaciones (cambios) de una tabla de Iceberg. 

Uso de Spark SQL (con la `%%sql` magia) desde una libreta Amazon EMR Studio:

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

Uso de la DataFrames API:

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

Código de salida de ejemplo:

![\[Ejemplo de salida de metadatos de una tabla Iceberg\]](http://docs.aws.amazon.com/es_es/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
