

# AWS Glue での ETL 出力のパーティションの管理
<a name="aws-glue-programming-etl-partitions"></a>

パーティション分割は、データセットを整理して効率的にクエリを実行可能にする重要な手法です。1 つまたは複数の列の個別の値に基づいて、データを階層形式のディレクトリ構造に整理します。

たとえば、Amazon Simple Storage Service (Amazon S3) のアプリケーションログを、年、月、日で分類しながら、日付についてパーティション化できます。次に 1 日分のデータに対応するファイルを `s3://my_bucket/logs/year=2018/month=01/day=23/` などのプレフィックス別に配置します。Amazon Athena、Amazon Redshift Spectrum、そして AWS Glue などのシステムでは、基になるデータ全体を Amazon S3 から読み取ることなく、これらのパーティションを使用してパーティション値でデータをフィルタリングできます。

クローラは、ファイルタイプとスキーマを推定するだけでなく、AWS Glue Data Catalog を構成する際に、データセットのパーティション構造も自動的に特定します。これにより出力されるパーティション列に対しては、AWS Glue ETL ジョブやクエリエンジン (Amazon Athena など) からクエリを実行できます。

テーブルのクロールが完了すると、クローラが作成したパーティションを表示できます。AWS Glue コンソールの左のナビゲーションペインで、[**Tables**] (テーブル) をクリックします。クローラで作成されたテーブルを選択した後、[**View Partitions**] (パーティション) の表示をクリックします。

Apache Hive 形式のパーティション分割されたパス (`key=val` 形式) の場合、クローラはキー名を使用して自動的に列名を事前設定します。それ以外の場合は、`partition_0`、`partition_1` などのデフォルト名が使用されます。コンソールでデフォルトの名前を変更できます。そのためには、テーブルに移動します。**[インデックス]** タブにインデックスが存在するかどうかを確認します。ある場合は、削除してから続行する必要があります (後で新しい列名を使用して再作成できます)。次に、**[スキーマの編集]** を選択し、そこでパーティション列の名前を変更します。

次に、ETL スクリプトでパーティション列をフィルタリングできます。パーティション情報は Data Catalog に格納されるため、パーティション列を `DynamicFrame` に含めるには `from_catalog` API 呼び出しを使用します。例えば、`create_dynamic_frame.from_options` ではなく `create_dynamic_frame.from_catalog` を使用します。

パーティション化は、データスキャンを減らす最適化手法です。この手法が適切であることを特定するプロセスの詳細については、「AWS 規範的ガイダンス」の「Best practices for performance tuning AWS Glue for Apache Spark jobs」ガイドにある「[Reduce the amount of data scan](https://docs.aws.amazon.com/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/reduce-data-scan.html)」を参照してください。

## プッシュダウン述語を使用した事前フィルタ処理
<a name="aws-glue-programming-etl-partitions-pushdowns"></a>

多くの場合、プッシュダウン述語を使用してパーティションをフィルタリングできます。データセットのすべてのファイルをリストアップして読み取る必要はありません。データセット全体を読み取って DynamicFrame でフィルタリングする代わりに、Data Catalog 内でパーティションのメタデータに直接フィルターを適用できます。次に、実際に必要なものだけをリストアップして DynamicFrame 内に読み取ることができます。

たとえば、Python では以下のように記述できます。

```
glue_context.create_dynamic_frame.from_catalog(
    database = "my_S3_data_set",
    table_name = "catalog_data_table",
    push_down_predicate = my_partition_predicate)
```

これによって作成される DynamicFrame では、Data Catalog のパーティションのうち、述語式を満たすものだけがロードされます。ロードするデータのサブセットを絞り込む度合いに応じて、処理時間を大幅に短縮できる場合があります。

述語式として、Spark SQL でサポートされている任意のブール式を使用できます。Spark SQL クエリで `WHERE` 句に指定できる条件は、すべて正常に動作します。例えば述語式 `pushDownPredicate = "(year=='2017' and month=='04')"` では、Data Catalog 内で `year` が「2017」に等しく、また `month` が「04」に等しいパーティションのみロードされます。詳細については、[Apache Spark SQL のドキュメント](https://spark.apache.org/docs/2.1.1/sql-programming-guide.html)を参照してください。特に [Scala SQL 関数リファレンス](https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.sql.functions$)が参考になります。

## カタログのパーティション述語を使用したサーバー側のフィルタリング
<a name="aws-glue-programming-etl-partitions-cat-predicates"></a>

`push_down_predicate` オプションは、カタログからすべてのパーティションを一覧表示した後、Amazon S3 にあるそれらのパーティションからファイルをリストする前に適用されます。テーブルに多数のパーティションがある場合、カタログパーティションのリストに、時間的なオーバーヘッドが余計に発生する可能性があります。このオーバーヘッドに対処するには、`catalogPartitionPredicate` オプションを指定して AWS Glue Data Catalog の[パーティションインデックス](https://docs.aws.amazon.com/glue/latest/dg/partition-indexes.html)を使用しながら、サーバー側のパーティションでプルーニングを行います。1 つのテーブルに数百万のパーティションがある場合、これにより、パーティションのフィルタリングを大幅に高速化できます。カタログのパーティションインデックスではまだサポートされていない述語構文が、`catalogPartitionPredicate` で必要となる場合には、`additional_options` の中で `push_down_predicate` と `catalogPartitionPredicate` の両方を使用することもできます

Python:

```
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
    database=dbname, 
    table_name=tablename,
    transformation_ctx="datasource0",
    push_down_predicate="day>=10 and customer_id like '10%'",
    additional_options={"catalogPartitionPredicate":"year='2021' and month='06'"}
)
```

Scala:

```
val dynamicFrame = glueContext.getCatalogSource(
    database = dbname,
    tableName = tablename, 
    transformationContext = "datasource0",
    pushDownPredicate="day>=10 and customer_id like '10%'",
    additionalOptions = JsonOptions("""{
        "catalogPartitionPredicate": "year='2021' and month='06'"}""")
    ).getDynamicFrame()
```

**注記**  
`push_down_predicate` と `catalogPartitionPredicate` では、使用される構文が異なります。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。

## パーティションの書き込み
<a name="aws-glue-programming-etl-partitions-writing"></a>

デフォルトでは、DynamicFrame は書き込むときにパーティション分割されません。すべての出力ファイルは、指定した出力パスの最上位レベルに書き込まれます。最近まで、DynamicFrame をパーティションに書き込む唯一の方法は、書き込む前に Spark SQL DataFrame に変換することでした。

ただし、DynamicFrames ではキーのシーケンスを使用したネイティブのパーティション分割がサポートされるようになりました。この場合、シンクの作成時に `partitionKeys` オプションを使用します。例えば、次の Python コードではデータセットを Parquet 形式で Amazon S3 のディレクトリに書き込みます。これらのディレクトリでは、型フィールドごとにパーティション分割されています。これらのパーティションに対しては、他のシステム (Amazon Athena など) を使用しての処理が行えます。

```
glue_context.write_dynamic_frame.from_options(
    frame = projectedEvents,
    connection_type = "s3",    
    connection_options = {"path": "$outpath", "partitionKeys": ["type"]},
    format = "parquet")
```