

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將資料轉換為 Apache Parquet 的三種 AWS Glue ETL 任務類型
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet"></a>

*Adnan Alvee、Nitth Govindasivan 和 Karthikeyan Ramachandran，Amazon Web Services*

## 總結
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-summary"></a>

在 Amazon Web Services (AWS) 雲端上，AWS Glue 是全受管擷取、轉換和載入 (ETL) 服務。AWS Glue 可讓您以符合成本效益的方式分類資料、清理資料、擴充資料，並在各種資料存放區和資料串流之間可靠地移動資料。

此模式在 AWS Glue 中提供不同的任務類型，並使用三種不同的指令碼來示範撰寫 ETL 任務。

您可以使用 AWS Glue 在 Python shell 環境中寫入 ETL 任務。您也可以在受管 Apache Spark 環境中使用 Python (PySpark) 或 Scala 建立批次和串流 ETL 任務。為了協助您開始撰寫 ETL 任務，此模式著重於使用 Python shell、PySpark 和 Scala 的批次 ETL 任務。Python shell 任務適用於需要較少運算能力的工作負載。受管 Apache Spark 環境適用於需要高運算能力的工作負載。

Apache Parquet 旨在支援高效的壓縮和編碼機制。它可以加速您的分析工作負載，因為它以單欄式方式存放資料。將資料轉換為 Parquet 可以節省長時間執行時的儲存空間、成本和時間。若要進一步了解 Parquet，請參閱部落格文章 [Apache Parquet：如何使用開放原始碼單欄式資料格式成為英雄](https://blog.openbridge.com/how-to-be-a-hero-with-powerful-parquet-google-and-amazon-f2ae0f35ee04)。

## 先決條件和限制
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-prereqs"></a>

**先決條件**
+ AWS Identity and Access Management (IAM) 角色 （如果您沒有角色，請參閱[其他資訊](#three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional)一節。)

## Architecture
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-architecture"></a>

**目標技術堆疊**
+ AWS Glue
+ Amazon Simple Storage Service (Amazon S3)
+ Apache Parquet

**自動化和擴展**
+ [AWS Glue 工作流程](https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html)支援 ETL 管道的完整自動化。
+ 您可以變更資料處理單位 (DPUs或工作者類型的數量，以水平和垂直擴展。

## 工具
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-tools"></a>

**AWS 服務**
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) 是一種雲端型物件儲存服務，可協助您儲存、保護和擷取任何數量的資料。
+ [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 是全受管 ETL 服務，可在各種資料存放區和資料串流之間分類、清理、擴充和移動資料。

**其他工具**
+ [Apache Parquet](https://parquet.apache.org/) 是一種開放原始碼資料欄導向的資料檔案格式，專為儲存和擷取而設計。

**組態**

使用下列設定來設定 AWS Glue ETL 的運算能力。若要降低成本，請在執行此模式中提供的工作負載時使用最小設定。 
+ **Python shell** – 您可以使用 1 個 DPU 來利用 16 GB 的記憶體，或使用 0.0625 個 DPU 來利用 1 GB 的記憶體。此模式使用 0.0625 DPU，這是 AWS Glue 主控台中的預設值。
+ **Python 或 Scala for Spark** – 如果您在主控台中選擇 Spark 相關任務類型，AWS Glue 預設會使用 10 個工作者和 G.1X 工作者類型。此模式使用兩個工作者，這是允許的最小數量，具有標準工作者類型，足夠且經濟實惠。

下表顯示 Apache Spark 環境的不同 AWS Glue 工作者類型。由於 Python shell 任務不使用 Apache Spark 環境來執行 Python，因此不會包含在資料表中。


| 
| 
|  | 標準 | G.1X | G.2X | 
| --- |--- |--- |--- |
| vCPU | 4 | 4 | 8 | 
| 記憶體 | 16 GB | 16 GB | 32 GB | 
| 磁碟空間 | 50 GB | 64 GB | 128 GB | 
| 每個工作者的執行器 | 2 | 1  | 1 | 

**Code**

如需此模式中使用的程式碼，包括 IAM 角色和參數組態，請參閱[其他資訊](#three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional)一節。

## 史詩
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-epics"></a>

### 上傳資料
<a name="upload-the-data"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 將資料上傳至新的或現有的 S3 儲存貯體。 | 在帳戶中建立或使用現有的 S3 儲存貯體。從[附件](#attachments-8c926709-8fa4-417f-9aaf-bcc8113d018f)區段上傳 sample\$1data.csv 檔案，並記下 S3 儲存貯體和字首位置。 | 一般 AWS | 

### 建立並執行 AWS Glue 任務
<a name="create-and-run-the-aws-glue-job"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立 AWS Glue 任務。 | 在 AWS Glue 主控台的 ETL 區段下，新增 AWS Glue 任務。選取適當的任務類型、AWS Glue 版本，以及對應的 DPU/工作者類型和工作者數量。如需詳細資訊，請參閱*組態*一節。 | 開發人員、雲端或資料 | 
| 變更輸入和輸出位置。 | 複製對應至 AWS Glue 任務的程式碼，並變更您在**上傳資料**圖示中記下的輸入和輸出位置。 | 開發人員、雲端或資料 | 
| 設定參數。 | 您可以使用[其他資訊](#three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional)區段中提供的程式碼片段來設定 ETL 任務的參數。AWS Glue 在內部使用四個引數名稱：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet.html)`--JOB_NAME` 參數必須在 AWS Glue 主控台上明確輸入。選擇**任務**、**編輯任務**、**安全組態、指令碼程式庫和任務參數 （選用）**。輸入 `--JOB_NAME`做為索引鍵，並提供值。您也可以使用 AWS Command Line Interface (AWS CLI) 或 AWS Glue API 來設定此參數。此`--JOB_NAME`參數由 Spark 使用，在 Python shell 環境任務中不需要。您必須在每個參數名稱`--`之前新增 ；否則，程式碼將無法運作。例如，對於程式碼片段，位置參數必須由 `--input_loc`和 調用`--output_loc`。 | 開發人員、雲端或資料 | 
| 執行 ETL 任務。 | 執行您的任務並檢查輸出。請注意，從原始檔案減少了多少空間。 | 開發人員、雲端或資料 | 

## 相關資源
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-resources"></a>

**參考**
+ [Apache Spark](https://spark.apache.org/)
+ [AWS Glue：運作方式](https://docs.aws.amazon.com/glue/latest/dg/how-it-works.html)
+ [AWS Glue 定價](https://aws.amazon.com/glue/pricing/)

**教學課程和影片**
+ [什麼是 AWS Glue？](https://www.youtube.com/watch?v=qgWMfNSN9f4)

## 其他資訊
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional"></a>

**IAM 角色**

建立 AWS Glue 任務時，您可以使用具有下列程式碼片段中所示許可的現有 IAM 角色或新角色。

若要建立新的角色，請使用下列 YAML 程式碼。

```
# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer
# Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc.

AWSTemplateFormatVersion: "2010-09-09"

Description: This template will setup IAM role for AWS Glue service.

Resources:
  rGlueRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "glue.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      Policies:
        - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "s3:PutObject"
                  - "s3:GetObject"
                Resource: "arn:aws:s3:::*/*"
      Tags:
        - Key  : "Name"
          Value : !Sub "${AWS::StackName}"

Outputs:
  oGlueRoleName:
    Description: AWS Glue IAM role
    Value:
      Ref: rGlueRole
    Export:
      Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]
```

**AWS Glue Python Shell**

Python 程式碼使用 Pandas 和 PyArrow 程式庫將資料轉換為 Parquet。Pandas 程式庫已可供使用。執行模式時會下載 PyArrow 程式庫，因為它是一次性執行。您可以使用 wheel 檔案將 PyArrow 轉換為程式庫，並以程式庫套件的形式提供檔案。如需封裝 wheel 檔案的詳細資訊，請參閱[提供您自己的 Python 程式庫](https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html)。

*AWS Glue Python shell 參數*

```
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])
```

*AWS Glue Python shell 程式碼*

```
from io import BytesIO
import pandas as pd
import boto3
import os
import io
import site
from importlib import reload
from setuptools.command import easy_install
install_path = os.environ['GLUE_INSTALLATION']
easy_install.main( ["--install-dir", install_path, "pyarrow"] )
reload(site)
import pyarrow


input_loc = "s3://bucket-name/prefix/sample_data.csv"
output_loc = "s3://bucket-name/prefix/"


input_bucket = input_loc.split('/', 1)[0]
object_key = input_loc.split('/', 1)[1]


output_loc_bucket = output_loc.split('/', 1)[0]
output_loc_prefix = output_loc.split('/', 1)[1] 


s3 = boto3.client('s3')
obj = s3.get_object(Bucket=input_bucket, Key=object_key)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))


parquet_buffer = BytesIO()
s3_resource = boto3.resource('s3')
df.to_parquet(parquet_buffer, index=False) 
s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())
```

**使用 Python 的 AWS Glue Spark 任務**

若要搭配 Python 使用 AWS Glue Spark 任務類型，請選擇 **Spark** 做為任務類型。選擇 **Spark 3.1、Python 3 搭配改善的任務啟動時間 (Glue 3.0 版） **做為 AWS Glue 版本。

*AWS Glue Python 參數*

```
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])
```

*使用 Python 程式碼的 AWS Glue Spark 任務*

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

input_loc = "s3://bucket-name/prefix/sample_data.csv"
output_loc = "s3://bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options(\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    })


outputDF = glueContext.write_dynamic_frame.from_options(\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet")
```

對於大量壓縮大型檔案 （例如 1，000 個檔案，每個檔案約為 3 MB)，請使用 `compressionType` 參數與 `recurse` 參數來讀取字首內可用的所有檔案，如下列程式碼所示。

```
input_loc = "bucket-name/prefix/"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options(
                    connection_type = "s3", 
                    connection_options = {"paths": [input_loc], 
                                            "compressionType":"gzip","recurse" :"True",
                                            },
                    format = "csv",
                    format_options={"withHeader": True,"separator": ","}
                    )
```

對於大量壓縮的小型檔案 （例如，各約 133 KB 的 1，000 個檔案），請使用 `groupFiles` 參數，以及 `compressionType`和 `recurse` 參數。`groupFiles` 參數會將小型檔案分組為多個大型檔案，而 `groupSize` 參數會以位元組為單位 （例如 1 MB) 將分組控制為指定的大小。下列程式碼片段提供在程式碼中使用這些參數的範例。

```
input_loc = "bucket-name/prefix/"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options(
                    connection_type = "s3", 
                    connection_options = {"paths": [input_loc], 
                                            "compressionType":"gzip","recurse" :"True",
                                             "groupFiles" :"inPartition",  "groupSize" :"1048576",
                                            },
                    format = "csv",
                    format_options={"withHeader": True,"separator": ","}
                    )
```

如果工作者節點沒有任何變更，這些設定可讓 AWS Glue 任務讀取多個檔案 （無論有無壓縮），並以 Parquet 格式寫入目標。

**使用 Scala 的 AWS Glue Spark 任務**

若要搭配 Scala 使用 AWS Glue Spark 任務類型，請選擇 **Spark** 做為任務類型，選擇**語言**做為 **Scala**。選擇 **Spark 3.1、Scala 2 搭配改善的任務啟動時間 (Glue 3.0 版） **做為 AWS Glue 版本。為了節省儲存空間，下列具有 Scala 範例的 AWS Glue 也會使用 `applyMapping`功能來轉換資料類型。

*AWS Glue Scala 參數*

```
import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)
```

*具有 Scala 程式碼的 AWS Glue Spark 任務*

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._


object GlueScalaApp {
  def main(sysArgs: Array[String]) {
    
    @transient val spark: SparkContext = SparkContext.getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark)

    val inputLoc = "s3://bucket-name/prefix/sample_data.csv"
    val outputLoc = "s3://bucket-name/prefix/"

    val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()

    val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),
    ("_c2", "string", "profit", "double")), caseSensitive = false)

    val formatPartition = applyMapping.toDF().coalesce(1)

    val dynamicFrame = DynamicFrame(formatPartition, glueContext)

    val dataSink = glueContext.getSinkWithFormat(
        connectionType = "s3", 
        options = JsonOptions(Map("path" -> outputLoc )),
        transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)
  }
}
```

## 附件
<a name="attachments-8c926709-8fa4-417f-9aaf-bcc8113d018f"></a>

若要存取與本文件相關聯的其他內容，請解壓縮下列檔案： [attachment.zip](samples/p-attach/8c926709-8fa4-417f-9aaf-bcc8113d018f/attachments/attachment.zip)