

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 PyIceberg 處理 Iceberg 資料表
<a name="iceberg-pyiceberg"></a>

本節說明如何使用 [PyIceberg](https://py.iceberg.apache.org/) 與 Iceberg 資料表互動。提供的範例是樣板程式碼，您可以在 [Amazon Linux 2023 EC2 ](https://aws.amazon.com/linux/amazon-linux-2023/)執行個體、[AWS Lambda](https://aws.amazon.com/lambda/)函數或任何具有正確設定[AWS 憑證](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)的 [Python](https://www.python.org/) 環境中執行。

## 先決條件
<a name="pyiceberg-prerequisites"></a>

**注意**  
這些範例使用 [PyIceberg 1.9.1。](https://github.com/apache/iceberg/releases/tag/apache-iceberg-1.9.1)

若要使用 PyIceberg，您需要 適用於 Python (Boto3) 的 AWS SDK 安裝 PyIceberg 和 。以下是如何設定 Python 虛擬環境以使用 PyIceberg 和 的範例 AWS Glue Data Catalog：

1. 使用 [pip python 套件安裝程式](https://pypi.org/project/pip/)下載 [PyIceberg](https://pypi.org/project/pyiceberg/)。您也需要 [Boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) 才能與 互動 AWS 服務。您可以使用下列命令，設定要測試的本機 Python 虛擬環境：

   ```
   python3 -m venv my_env
   cd my_env/bin/
   source activate
   pip install "pyiceberg[pyarrow,pandas,glue]"
   pip install boto3
   ```

1. 執行`python` 以開啟 Python shell 並測試命令。

## 連線至 Data Catalog
<a name="pyiceberg-data-catalog"></a>

若要開始在 中使用 Iceberg 資料表 AWS Glue，您必須先連線到 AWS Glue Data Catalog。 

`load_catalog`函數會建立[目錄](https://py.iceberg.apache.org/reference/pyiceberg/catalog/)物件，做為所有 Iceberg 操作的主要界面，以初始化與 Data Catalog 的連線：

```
from pyiceberg.catalog import load_catalog
region = "us-east-1"

glue_catalog = load_catalog(
    'default',
    **{
        'client.region': region
    },
    type='glue'
)
```

## 列出和建立資料庫
<a name="pyiceberg-list-database"></a>

若要列出現有的資料庫，請使用 `list_namespaces`函數：

```
databases = glue_catalog.list_namespaces()
print(databases)
```

若要建立新的資料庫，請使用 `create_namespace`函數：

```
database_name="mydb"
s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}"

glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
```

## 建立和寫入 Iceberg 資料表
<a name="pyiceberg-create-table"></a>

### 未分割的資料表
<a name="unpartitioned"></a>

以下是使用 `create_table`函數建立未分割 Iceberg 資料表的範例：

```
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

database_name="mydb"
table_name="pyiceberg_table"
s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}"

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
```

您可以使用 `list_tables`函數來檢查資料庫內的資料表清單：

```
tables = glue_catalog.list_tables(namespace=database_name)
print(tables)
```

您可以使用 `append`函數和 PyArrow 在 Iceberg 資料表中插入資料：

```
import pyarrow as pa
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

table = glue_catalog.load_table(f"{database_name}.{table_name}")
table.append(df)
```

### 分割的資料表
<a name="partitioned"></a>

以下是使用 `create_table`函數和 建立具有[隱藏分割](https://iceberg.apache.org/docs/1.4.0/partitioning/#icebergs-hidden-partitioning)[的分割](https://iceberg.apache.org/docs/1.4.0/partitioning/) Iceberg 資料表的範例`PartitionSpec`：

```
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField,
    StringType,
    FloatType,
    DoubleType,
    TimestampType,
)

# Define the schema
schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True),
    NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False),
    NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False),
    NestedField(field_id=5, name="height", field_type=FloatType(), required=False),
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
    PartitionField(
        source_id=1,  # Refers to "datetime"
        field_id=1000,
        transform=DayTransform(),
        name="datetime_day"
    )
)

database_name="mydb"
partitioned_table_name="pyiceberg_table_partitioned"
s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}"

glue_catalog.create_table(
    identifier=f"{database_name}.{partitioned_table_name}",
    schema=schema,
    location=s3_table_path,
    partition_spec=partition_spec
)
```

您可以將資料插入分割資料表的方式與未分割資料表相同。分割會自動處理。

```
from datetime import datetime
arrow_schema = pa.schema([
    pa.field("datetime", pa.timestamp("us"), nullable=False),
    pa.field("drone_id", pa.string(), nullable=False),
    pa.field("lat", pa.float64()),
    pa.field("lon", pa.float64()),
    pa.field("height", pa.float32()),  
])

data = [
    {
        "datetime": datetime(2024, 6, 1, 12, 0, 0),
        "drone_id": "drone_001",
        "lat": 52.371807,
        "lon": 4.896029,
        "height": 120.5,
    },
    {
        "datetime": datetime(2024, 6, 1, 12, 5, 0),
        "drone_id": "drone_002",
        "lat": 37.773972,
        "lon": -122.431297,
        "height": 150.0,
    },
    {
        "datetime": datetime(2024, 6, 2, 9, 0, 0),
        "drone_id": "drone_001",
        "lat": 53.11254,
        "lon": 6.0989,
        "height": 110.2,
    },
    {
        "datetime": datetime(2024, 6, 2, 9, 30, 0),
        "drone_id": "drone_003",
        "lat": 48.864716,
        "lon": 2.349014,
        "height": 145.7,
    },
]

df = pa.Table.from_pylist(data, schema=arrow_schema)

table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}")
table.append(df)
```

## 讀取資料
<a name="pyiceberg-read-data"></a>

您可以使用 PyIceberg `scan`函數從 Iceberg 資料表讀取資料。您可以篩選資料列、選取特定資料欄，以及限制傳回的記錄數目。

```
table= glue_catalog.load_table(f"{database_name}.{table_name}")
scan_df = table.scan(
    row_filter=(
        f"city = 'Amsterdam'"
    ),
    selected_fields=("city", "lat"),
    limit=100,
).to_pandas()

print(scan_df)
```

## 刪除資料
<a name="pyiceberg-delete-data"></a>

PyIceberg `delete`函數可讓您使用 從資料表中移除記錄`delete_filter`：

```
table = glue_catalog.load_table(f"{database_name}.{table_name}")
table.delete(delete_filter="city == 'Paris'")
```

## 存取中繼資料
<a name="pyiceberg-metadata"></a>

PyIceberg 提供多種函數來存取資料表中繼資料。以下是您可以檢視資料表快照相關資訊的方式：

```
#List of snapshots
table.snapshots()

#Current snapshot
table.current_snapshot()

#Take a previous snapshot
second_last_snapshot_id=table.snapshots()[-2].snapshot_id
print(f"Second last SnapshotID: {second_last_snapshot_id}")
```

如需可用中繼資料的詳細清單，請參閱 PyIceberg 文件的[中繼資料](https://py.iceberg.apache.org/reference/pyiceberg/table/metadata/)程式碼參考區段。

## 使用時間歷程
<a name="pyiceberg-time-travel"></a>

您可以使用資料表快照進行時間歷程，以存取資料表的先前狀態。以下是在上次操作之前檢視資料表狀態的方法：

```
second_last_snapshot_id=table.snapshots()[-2].snapshot_id

time_travel_df = table.scan(
    limit=100,
    snapshot_id=second_last_snapshot_id
).to_pandas()

print(time_travel_df)
```

如需可用函數的完整清單，請參閱 PyIceberg[Python API](https://py.iceberg.apache.org/api/) 文件。