

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Trabalhando com tabelas Iceberg usando o Apache Spark
<a name="iceberg-spark"></a>

Esta seção fornece uma visão geral do uso do Apache Spark para interagir com tabelas Iceberg. Os exemplos são códigos padronizados que podem ser executados no Amazon EMR ou. AWS Glue

Observação: a interface principal para interagir com as tabelas do Iceberg é o SQL, então a maioria dos exemplos combinará o Spark SQL com a API. DataFrames 

## Criando e escrevendo tabelas Iceberg
<a name="spark-create-data"></a>

Você pode usar o Spark SQL e o Spark DataFrames para criar e adicionar dados às tabelas do Iceberg.

### Usando o Spark SQL
<a name="spark-sql"></a>

Para escrever um conjunto de dados do Iceberg, use instruções SQL padrão do Spark, como e. `CREATE TABLE` `INSERT INTO`

#### Tabelas não particionadas
<a name="spark-sql-unpartitioned"></a>

Aqui está um exemplo de criação de uma tabela Iceberg não particionada com o Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    OPTIONS ('format-version'='2')
""")
```

Para inserir dados em uma tabela não particionada, use uma instrução padrão`INSERT INTO`:

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

#### Tabelas particionadas
<a name="spark-sql-partitioned"></a>

Aqui está um exemplo de criação de uma tabela Iceberg particionada com o Spark SQL:

```
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        c_customer_sk             int,
        c_customer_id             string,
        c_first_name              string,
        c_last_name               string,
        c_birth_country           string,
        c_email_address           string)
    USING iceberg
    PARTITIONED BY (c_birth_country)
    OPTIONS ('format-version'='2')
""")
```

Para inserir dados em uma tabela Iceberg particionada com o Spark SQL, use uma instrução padrão: `INSERT INTO`

```
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions
SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address
FROM another_table
""")
```

**nota**  
A partir do Iceberg 1.5.0, o modo de distribuição de `hash` gravação é o padrão quando você insere dados em tabelas particionadas. Para obter mais informações, consulte [Escrevendo modos de distribuição](https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes) na documentação do Iceberg.

### Usando a DataFrames API
<a name="spark-dataframes"></a>

Para escrever um conjunto de dados do Iceberg, você pode usar a `DataFrameWriterV2` API.

Para criar uma tabela Iceberg e gravar dados nela, use a função `df.writeTo(` t). Se a tabela existir, use a `.append()` função. Se isso não acontecer, use `.create().` Os exemplos a seguir usam`.createOrReplace()`, que é uma variação do `.create()` que é equivalente `CREATE OR REPLACE TABLE AS SELECT` a.

#### Tabelas não particionadas
<a name="spark-df-unpartitioned"></a>

Para criar e preencher uma tabela Iceberg não particionada usando a API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .tableProperty("format-version", "2") \
    .createOrReplace()
```

Para inserir dados em uma tabela Iceberg não particionada existente usando a API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \
    .append()
```

#### Tabelas particionadas
<a name="spark-df-partitioned"></a>

Para criar e preencher uma tabela Iceberg particionada usando a API: `DataFrameWriterV2`

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .tableProperty("format-version", "2") \
    .partitionedBy("c_birth_country") \
    .createOrReplace()
```

Para inserir dados em uma tabela Iceberg particionada usando a `DataFrameWriterV2` API:

```
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \
    .append()
```

## Atualização de dados em tabelas Iceberg
<a name="spark-update-data"></a>

O exemplo a seguir mostra como atualizar dados em uma tabela Iceberg. Este exemplo modifica todas as linhas que têm um número par na `c_customer_sk` coluna.

```
spark.sql(f"""
UPDATE {CATALOG_NAME}.{db.name}.{table.name}
SET c_email_address = 'even_row' 
WHERE c_customer_sk % 2 == 0
""")
```

Essa operação usa a copy-on-write estratégia padrão e, portanto, reescreve todos os arquivos de dados afetados.

## Atualizando dados em tabelas Iceberg
<a name="spark-upsert-data"></a>

A atualização de dados se refere à inserção de novos registros de dados e à atualização dos registros de dados existentes em uma única transação. Para inserir dados em uma tabela Iceberg, você usa a declaração. `SQL MERGE INTO` 

O exemplo a seguir altera o conteúdo da tabela`{UPSERT_TABLE_NAME`\$1 dentro da tabela: `{TABLE_NAME}`

```
spark.sql(f"""
    MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t
    USING {UPSERT_TABLE_NAME} s
        ON t.c_customer_id = s.c_customer_id
    WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address
    WHEN NOT MATCHED THEN INSERT *
""")
```
+ Se um registro de cliente que está em `{UPSERT_TABLE_NAME}` já existir `{TABLE_NAME}` com o mesmo`c_customer_id`, o `c_email_address` valor do `{UPSERT_TABLE_NAME}` registro substituirá o valor existente (operação de atualização).
+ Se um registro de cliente que está em `{UPSERT_TABLE_NAME}` não existir em`{TABLE_NAME}`, o `{UPSERT_TABLE_NAME}` registro será adicionado à `{TABLE_NAME}` (operação de inserção).

## Excluindo dados em tabelas Iceberg
<a name="spark-delete-data"></a>

Para excluir dados de uma tabela do Iceberg, use a `DELETE FROM` expressão e especifique um filtro que corresponda às linhas a serem excluídas.

```
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{db.name}.{table.name}
WHERE c_customer_sk % 2 != 0
""")
```

Se o filtro corresponder a uma partição inteira, o Iceberg executará uma exclusão somente de metadados e deixará os arquivos de dados no lugar. Caso contrário, ele reescreve somente os arquivos de dados afetados.

O método delete pega os arquivos de dados afetados pela `WHERE` cláusula e cria uma cópia deles sem os registros excluídos. Em seguida, ele cria um novo instantâneo da tabela que aponta para os novos arquivos de dados. Portanto, os registros excluídos ainda estão presentes nos instantâneos mais antigos da tabela. Por exemplo, se você recuperar o instantâneo anterior da tabela, verá os dados que acabou de excluir. Para obter informações sobre como remover instantâneos antigos desnecessários com os arquivos de dados relacionados para fins de limpeza, consulte a seção [Manutenção de arquivos usando compactação](best-practices-compaction.md), mais adiante neste guia.

## Leitura de dados
<a name="spark-read-data"></a>

Você pode ler o status mais recente de suas tabelas Iceberg no Spark com o Spark SQL e. DataFrames 

Exemplo usando o Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5
""")
```

Exemplo de uso da DataFrames API:

```
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
```

## Usando a viagem no tempo
<a name="spark-time-travel"></a>

Cada operação de gravação (inserção, atualização, upsert, delete) em uma tabela do Iceberg cria um novo instantâneo. Em seguida, você pode usar esses instantâneos para viajar no tempo, para voltar no tempo e verificar o status de uma tabela no passado.

Para obter informações sobre como recuperar o histórico de instantâneos para tabelas usando `snapshot-id` e cronometrando valores, consulte a seção [Acessando metadados](#spark-metadata) mais adiante neste guia.

A consulta de viagem no tempo a seguir exibe o status de uma tabela com base em uma tabela específica`snapshot-id`.

Usar o Spark SQL:

```
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id}
""")
```

Usando a DataFrames API:

```
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
     .format("iceberg") \
     .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ 
     .limit(5)
```

A consulta de viagem no tempo a seguir exibe o status de uma tabela com base no último instantâneo criado antes de um timestamp específico, em milissegundos (). `as-of-timestamp`

Usar o Spark SQL:

```
spark.sql(f"""
SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}'
""")
```

Usando a DataFrames API:

```
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
                          .format("iceberg") \
                          .load(f"dev.{DB_NAME}.{TABLE_NAME}") \
                          .limit(5)
```

## Usando consultas incrementais
<a name="spark-incremental-queries"></a>

Você também pode usar instantâneos do Iceberg para ler dados anexados de forma incremental. 

Nota: Atualmente, essa operação oferece suporte à leitura de dados de `append` instantâneos. Ele não suporta a busca de dados de operações como `replace``overwrite`, ou`delete`.  Além disso, as operações de leitura incremental não são suportadas na sintaxe do Spark SQL.

O exemplo a seguir recupera todos os registros anexados a uma tabela Iceberg entre o instantâneo `start-snapshot-id` (exclusivo) e `end-snapshot-id` (inclusive).

```
df_incremental = (spark.read.format("iceberg")
    .option("start-snapshot-id", snapshot_id_start)
    .option("end-snapshot-id", snapshot_id_end)
    .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
```

## Acessando metadados
<a name="spark-metadata"></a>

O Iceberg fornece acesso aos seus metadados por meio de SQL. Você pode acessar os metadados de qualquer tabela (`<table_name>`) consultando o namespace. `<table_name>.<metadata_table>` Para obter uma lista completa das tabelas de metadados, consulte Como [inspecionar tabelas na documentação](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables) do Iceberg.

O exemplo a seguir mostra como acessar a tabela de metadados do histórico do Iceberg, que mostra o histórico de confirmações (alterações) de uma tabela do Iceberg. 

Usando o Spark SQL (com a `%%sql` mágica) de um notebook Amazon EMR Studio:

```
Spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
```

Usando a DataFrames API:

```
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
```

Exemplo de saída:

![\[Exemplo de saída de metadados de uma tabela Iceberg\]](http://docs.aws.amazon.com/pt_br/prescriptive-guidance/latest/apache-iceberg-on-aws/images/metadata-sample-output.png)
