

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# PyIceberg 테이블 작업
<a name="iceberg-pyiceberg"></a>

이 섹션에서는 [PyIceberg](https://py.iceberg.apache.org/)를 사용하여 Iceberg 테이블과 상호 작용하는 방법을 설명합니다. 제공된 예제는 [Amazon Linux 2023 EC2](https://aws.amazon.com/linux/amazon-linux-2023/) 인스턴스, [AWS Lambda](https://aws.amazon.com/lambda/) 함수 또는 자격 [AWS 증명](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)이 올바르게 구성된 [Python](https://www.python.org/) 환경에서 실행할 수 있는 표준 문안 코드입니다.

## 사전 조건
<a name="pyiceberg-prerequisites"></a>

**참고**  
이 예제에서는 [PyIceberg 1.9.1](https://github.com/apache/iceberg/releases/tag/apache-iceberg-1.9.1)을 사용합니다.

PyIceberg를 사용하려면 PyIceberg 및가 AWS SDK for Python (Boto3) 설치되어 있어야 합니다. 다음은 PyIceberg 및와 함께 작동하도록 Python 가상 환경을 설정하는 방법의 예입니다. AWS Glue Data Catalog

1. [pip python 패키지 설치](https://pypi.org/project/pip/) 관리자를 사용하여 [PyIceberg](https://pypi.org/project/pyiceberg/)를 다운로드합니다. 또한와 상호 작용하려면 [Boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html)가 필요합니다 AWS 서비스. 다음 명령을 사용하여 테스트하도록 로컬 Python 가상 환경을 구성할 수 있습니다.

   ```
   python3 -m venv my_env
   cd my_env/bin/
   source activate
   pip install "pyiceberg[pyarrow,pandas,glue]"
   pip install boto3
   ```

1. 를 실행`python`하여 Python 셸을 열고 명령을 테스트합니다.

## 데이터 카탈로그에 연결
<a name="pyiceberg-data-catalog"></a>

에서 Iceberg 테이블 작업을 시작하려면 AWS Glue먼저에 연결해야 합니다 AWS Glue Data Catalog. 

`load_catalog`함수는 모든 Iceberg 작업에 대한 기본 인터페이스 역할을 하는 [카탈로그](https://py.iceberg.apache.org/reference/pyiceberg/catalog/) 객체를 생성하여 데이터 카탈로그에 대한 연결을 초기화합니다.

```
from pyiceberg.catalog import load_catalog
region = "us-east-1"

glue_catalog = load_catalog(
    'default',
    **{
        'client.region': region
    },
    type='glue'
)
```

## 데이터베이스 나열 및 생성
<a name="pyiceberg-list-database"></a>

기존 데이터베이스를 나열하려면 `list_namespaces` 함수를 사용합니다.

```
databases = glue_catalog.list_namespaces()
print(databases)
```

새 데이터베이스를 생성하려면 `create_namespace` 함수를 사용합니다.

```
database_name="mydb"
s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}"

glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
```

## Iceberg 테이블 생성 및 작성
<a name="pyiceberg-create-table"></a>

### 분할되지 않은 테이블
<a name="unpartitioned"></a>

다음은 `create_table` 함수를 사용하여 파티셔닝되지 않은 Iceberg 테이블을 생성하는 예제입니다.

```
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

database_name="mydb"
table_name="pyiceberg_table"
s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}"

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
```

`list_tables` 함수를 사용하여 데이터베이스 내의 테이블 목록을 확인할 수 있습니다.

```
tables = glue_catalog.list_tables(namespace=database_name)
print(tables)
```

함수와 PyArrow를`append` 사용하여 Iceberg 테이블 내에 데이터를 삽입할 수 있습니다.

```
import pyarrow as pa
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

table = glue_catalog.load_table(f"{database_name}.{table_name}")
table.append(df)
```

### 분할된 테이블
<a name="partitioned"></a>

다음은 `create_table` 함수 및를 사용하여 [숨겨진](https://iceberg.apache.org/docs/1.4.0/partitioning/#icebergs-hidden-partitioning) [파티셔닝이 있는 파티셔닝된](https://iceberg.apache.org/docs/1.4.0/partitioning/) Iceberg 테이블을 생성하는 예제입니다`PartitionSpec`.

```
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField,
    StringType,
    FloatType,
    DoubleType,
    TimestampType,
)

# Define the schema
schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True),
    NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False),
    NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False),
    NestedField(field_id=5, name="height", field_type=FloatType(), required=False),
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
    PartitionField(
        source_id=1,  # Refers to "datetime"
        field_id=1000,
        transform=DayTransform(),
        name="datetime_day"
    )
)

database_name="mydb"
partitioned_table_name="pyiceberg_table_partitioned"
s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}"

glue_catalog.create_table(
    identifier=f"{database_name}.{partitioned_table_name}",
    schema=schema,
    location=s3_table_path,
    partition_spec=partition_spec
)
```

파티셔닝되지 않은 테이블과 동일한 방식으로 파티셔닝된 테이블에 데이터를 삽입할 수 있습니다. 파티셔닝은 자동으로 처리됩니다.

```
from datetime import datetime
arrow_schema = pa.schema([
    pa.field("datetime", pa.timestamp("us"), nullable=False),
    pa.field("drone_id", pa.string(), nullable=False),
    pa.field("lat", pa.float64()),
    pa.field("lon", pa.float64()),
    pa.field("height", pa.float32()),  
])

data = [
    {
        "datetime": datetime(2024, 6, 1, 12, 0, 0),
        "drone_id": "drone_001",
        "lat": 52.371807,
        "lon": 4.896029,
        "height": 120.5,
    },
    {
        "datetime": datetime(2024, 6, 1, 12, 5, 0),
        "drone_id": "drone_002",
        "lat": 37.773972,
        "lon": -122.431297,
        "height": 150.0,
    },
    {
        "datetime": datetime(2024, 6, 2, 9, 0, 0),
        "drone_id": "drone_001",
        "lat": 53.11254,
        "lon": 6.0989,
        "height": 110.2,
    },
    {
        "datetime": datetime(2024, 6, 2, 9, 30, 0),
        "drone_id": "drone_003",
        "lat": 48.864716,
        "lon": 2.349014,
        "height": 145.7,
    },
]

df = pa.Table.from_pylist(data, schema=arrow_schema)

table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}")
table.append(df)
```

## 데이터 읽기
<a name="pyiceberg-read-data"></a>

PyIceberg `scan` 함수를 사용하여 Iceberg 테이블에서 데이터를 읽을 수 있습니다. 행을 필터링하고, 특정 열을 선택하고, 반환된 레코드 수를 제한할 수 있습니다.

```
table= glue_catalog.load_table(f"{database_name}.{table_name}")
scan_df = table.scan(
    row_filter=(
        f"city = 'Amsterdam'"
    ),
    selected_fields=("city", "lat"),
    limit=100,
).to_pandas()

print(scan_df)
```

## 데이터 삭제
<a name="pyiceberg-delete-data"></a>

PyIceberg `delete` 함수를 사용하면 `delete_filter`를 사용하여 테이블에서 레코드를 제거할 수 있습니다.

```
table = glue_catalog.load_table(f"{database_name}.{table_name}")
table.delete(delete_filter="city == 'Paris'")
```

## 메타데이터 액세스
<a name="pyiceberg-metadata"></a>

PyIceberg는 테이블 메타데이터에 액세스하는 여러 함수를 제공합니다. 테이블 스냅샷에 대한 정보를 보는 방법은 다음과 같습니다.

```
#List of snapshots
table.snapshots()

#Current snapshot
table.current_snapshot()

#Take a previous snapshot
second_last_snapshot_id=table.snapshots()[-2].snapshot_id
print(f"Second last SnapshotID: {second_last_snapshot_id}")
```

사용 가능한 메타데이터의 자세한 목록은 PyIceberg 설명서의 [메타데이터](https://py.iceberg.apache.org/reference/pyiceberg/table/metadata/) 코드 참조 섹션을 참조하세요.

## 시간 이동 사용
<a name="pyiceberg-time-travel"></a>

테이블 스냅샷을 시간 이동에 사용하여 테이블의 이전 상태에 액세스할 수 있습니다. 다음은 마지막 작업 이전의 테이블 상태를 보는 방법입니다.

```
second_last_snapshot_id=table.snapshots()[-2].snapshot_id

time_travel_df = table.scan(
    limit=100,
    snapshot_id=second_last_snapshot_id
).to_pandas()

print(time_travel_df)
```

사용 가능한 함수의 전체 목록은 PyIceberg[Python API](https://py.iceberg.apache.org/api/) 설명서를 참조하세요.