

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 三种用于将数据转换为 Apache Parquet 的 AWS Glue ETL 作业类型
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet"></a>

*Adnan Alvee、Nith Govindasivan 和 Karthikeyan Ramachandran，Amazon Web Services*

## Summary
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-summary"></a>

在 Amazon Web Services（AWS）Cloud 上，AWS Glue 是一项完全托管的提取、转换、加载（ETL）服务。AWS Glue 使您能够经济高效地对数据进行分类、清理和扩充，并在各种数据存储和数据流之间可靠地移动数据。

此模式在 AWS Glue 中提供了不同的作业类型，并使用三种不同的脚本来演示 ETL 作业的创作。

您可以使用 AWS Glue 在 Python Shell 环境中编写 ETL 作业。您还可以在托管 Apache Spark 环境中使用 Python (PySpark) 或 Scala 创建批处理和流式处理 ETL 作业。为了开始创作 ETL 作业，此模式侧重于使用 Python shell、和 Scala 的批处理 ETL 作业。 PySparkPython Shell 作业适用于需要较低计算能力的工作负载。托管 Apache Spark 环境适用于需要高计算能力的工作负载。

Apache Parquet 旨在支持高效的压缩和编码方案。它可以加快分析工作负载的速度，因为它以列式方式存储数据。从长远来看，将数据转换为 Parquet 可以为您节省存储空间、成本和时间。要详细了解 Parquet，请参阅博客文章 [Apache Parquet：如何使用开源列式数据格式成为大侠](https://blog.openbridge.com/how-to-be-a-hero-with-powerful-parquet-google-and-amazon-f2ae0f35ee04)。

## 先决条件和限制
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-prereqs"></a>

**先决条件**
+ AWS Identity and Access Management（IAM）角色（如果您没有角色，请参阅[其他信息](#three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional)部分）。

## 架构
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-architecture"></a>

**目标技术堆栈 **
+ AWS Glue
+ Amazon Simple Storage Service（Amazon S3）
+ Apache Parquet

**自动化和扩缩**
+ [AWS Glue 工作流程](https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html)支持 ETL 管线的完全自动化。
+ 您可以将数据处理单元的数量 (DPUs) 或工作器类型更改为水平和垂直扩展。

## 工具
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-tools"></a>

**Amazon Web Services**
+ [Amazon Simple Storage Service（Amazon S3）](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html)是一项基于云的对象存储服务，可帮助您存储、保护和检索任意数量的数据。
+ [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 是一项完全托管的 ETL 服务，用于在各种数据存储和数据流之间对数据进行分类、清理、扩充和移动。

**其他工具**
+ [Apache Parquet](https://parquet.apache.org/) 是一种专为存储和检索而设计的开源列式数据文件格式。

**配置**

使用以下设置来配置 AWS Glue ETL 的计算能力。要降低成本，请在运行此模式中提供的工作负载时使用最低设置。 
+ **Python Shell** – 您可以使用 1 个 DPU 来利用 16 GB 的内存，也可以使用 0.0625 DPU 来使用 1 GB 的内存。此模式使用 0.0625 DPU，这是 AWS Glue 控制台中的默认设置。
+ **Python 或 Scala for Spark** – 如果您在控制台中选择与 Spark 相关的作业类型，AWS Glue 默认使用 10 个 Worker 和 G.1X Worker 类型。这种模式使用两个 Worker（这是允许的最小数量），标准 Worker 类型足够且具有成本效益。

下表显示了 Apache Spark 环境的不同 AWS Glue Worker 类型。由于 Python Shell 作业不使用 Apache Spark 环境来运行 Python，因此它未包含在表中。


| 
| 
|  | 标准 | G.1X | G.2X | 
| --- |--- |--- |--- |
| vCPU | 4 | 4 | 8 | 
| 内存 | 16 GB | 16 GB | 32 GB | 
| 磁盘空间 | 50 GB | 64 GB | 128 GB | 
| 每个工作线程的执行程序 | 2 | 1  | 1 | 

**代码**

有关此模式中使用的代码，包括 IAM 角色和参数配置，请参阅[其他信息](#three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional)部分。

## 操作说明
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-epics"></a>

### 上传数据
<a name="upload-the-data"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 将数据上传到新的或现有 S3 存储桶。 | 在账户中创建 S3 存储桶或使用现有 S3 存储桶。从[附件](#attachments-8c926709-8fa4-417f-9aaf-bcc8113d018f)部分上传 sample\$1data.csv 文件，并记下 S3 存储桶和前缀位置。 | 常规 AWS | 

### 创建并运行 AWS Glue 作业
<a name="create-and-run-the-aws-glue-job"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 创建 AWS Glue 作业。 | 在 AWS Glue 控制台的 ETL 部分下方，添加一个 AWS Glue 作业。选择相应的工作类型、AWS Glue 版本以及相应的工作人员 DPU/Worker 类型和数量。有关详细信息，请参阅*配置*部分。 | 开发人员、云或数据 | 
| 更改输入和输出位置。 | 复制与 AWS Glue 作业对应的代码，然后更改您在**上传数据**操作说明中记下的输入和输出位置。 | 开发人员、云或数据 | 
| 配置参数。 | 您可以使用[其他信息](#three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional)部分中提供的片段为 ETL 作业设置参数。AWS Glue 在内部使用四个参数名称：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet.html)必须在 AWS Glue 控制台上明确输入该 `--JOB_NAME`参数。选择**作业**、**编辑作业**、**安全配置、脚本库和作业参数（可选）**。输入 `--JOB_NAME` 作为密钥并提供一个值。您也可以使用 AWS 命令行界面（AWS CLI）或 AWS Glue API 来设置此参数。Spark 使用该 `--JOB_NAME` 参数，在 Python Shell 环境作业中不需要该参数。必须在每个参数名称之前添加 `--`；否则，代码将无法运行。例如，对于代码片段，位置参数必须由 `--input_loc` 和 `--output_loc` 调用。 | 开发人员、云或数据 | 
| 运行 ETL 作业。 | 运行作业并检查输出。注意与原始文件相比减少了多少空间。 | 开发人员、云服务或数据 | 

## 相关资源
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-resources"></a>

**参考**
+ [Apache Spark](https://spark.apache.org/)
+ [AWS Glue：工作原理](https://docs.aws.amazon.com/glue/latest/dg/how-it-works.html)
+ [AWS Glue 定价](https://aws.amazon.com/glue/pricing/)

**教程和视频**
+ [什么是 AWS Glue？](https://www.youtube.com/watch?v=qgWMfNSN9f4)

## 附加信息
<a name="three-aws-glue-etl-job-types-for-converting-data-to-apache-parquet-additional"></a>

**IAM 角色**

创建 AWS Glue 作业时，您可以使用具有以下代码片段所示权限的现有 IAM 角色或新角色。

要创建新角色，请使用以下 YAML 代码。

```
# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer
# Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc.

AWSTemplateFormatVersion: "2010-09-09"

Description: This template will setup IAM role for AWS Glue service.

Resources:
  rGlueRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "glue.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      Policies:
        - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "s3:PutObject"
                  - "s3:GetObject"
                Resource: "arn:aws:s3:::*/*"
      Tags:
        - Key  : "Name"
          Value : !Sub "${AWS::StackName}"

Outputs:
  oGlueRoleName:
    Description: AWS Glue IAM role
    Value:
      Ref: rGlueRole
    Export:
      Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]
```

**AWS Glue (Python Shell)**

Python 代码使用 Pandas 和 PyArrow 库将数据转换为 Parquet。Pandas 库已经可用。该 PyArrow 库是在你运行模式时下载的，因为这是一次性运行。您可以使用 wheel 文件 PyArrow 转换为库并将该文件作为库包提供。有关打包 Wheel 文件的更多信息，请参阅[提供您自己的 Python 库](https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html)。

*AWS Glue Python Shell 参数*

```
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])
```

*AWS Glue（Python Shell）代码*

```
from io import BytesIO
import pandas as pd
import boto3
import os
import io
import site
from importlib import reload
from setuptools.command import easy_install
install_path = os.environ['GLUE_INSTALLATION']
easy_install.main( ["--install-dir", install_path, "pyarrow"] )
reload(site)
import pyarrow


input_loc = "s3://bucket-name/prefix/sample_data.csv"
output_loc = "s3://bucket-name/prefix/"


input_bucket = input_loc.split('/', 1)[0]
object_key = input_loc.split('/', 1)[1]


output_loc_bucket = output_loc.split('/', 1)[0]
output_loc_prefix = output_loc.split('/', 1)[1] 


s3 = boto3.client('s3')
obj = s3.get_object(Bucket=input_bucket, Key=object_key)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))


parquet_buffer = BytesIO()
s3_resource = boto3.resource('s3')
df.to_parquet(parquet_buffer, index=False) 
s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())
```

**使用 Python 的 AWS Glue Spark 作业**

要在 Python 中使用 AWS Glue Spark 作业类型，请选择 **Spark** 作为作业类型。选择**作业启动时间缩短的 Spark 3.1、Python 3（Glue 版本 3.0）**作为 AWS Glue 版本。

*AWS Glue Python 参数*

```
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])
```

*使用 Python 代码的 AWS Glue Spark 作业*

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

input_loc = "s3://bucket-name/prefix/sample_data.csv"
output_loc = "s3://bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options(\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    })


outputDF = glueContext.write_dynamic_frame.from_options(\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet")
```

对于大量压缩的大文件（例如，1,000 个文件，每个文件约为 3 MB），请使用带有 `recurse` 参数的 `compressionType` 参数读取前缀内的所有可用文件，如以下代码所示。

```
input_loc = "bucket-name/prefix/"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options(
                    connection_type = "s3", 
                    connection_options = {"paths": [input_loc], 
                                            "compressionType":"gzip","recurse" :"True",
                                            },
                    format = "csv",
                    format_options={"withHeader": True,"separator": ","}
                    )
```

对于大量压缩的小文件（例如，1,000 个文件，每个文件大小约为 133 KB），请使用 `groupFiles` 参数以及 `compressionType` 和 `recurse` 参数。`groupFiles` 参数将小文件分组为多个大文件，`groupSize` 参数将分组控制为以字节为单位的指定大小（例如，1 MB）。以下代码片段提供了在代码中使用这些参数的示例。

```
input_loc = "bucket-name/prefix/"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options(
                    connection_type = "s3", 
                    connection_options = {"paths": [input_loc], 
                                            "compressionType":"gzip","recurse" :"True",
                                             "groupFiles" :"inPartition",  "groupSize" :"1048576",
                                            },
                    format = "csv",
                    format_options={"withHeader": True,"separator": ","}
                    )
```

无需对 Worker 节点进行任何更改，这些设置就使 AWS Glue 作业可以读取多个文件（大或小，有或没有压缩），然后以 Parquet 格式将它们写入目标。

**使用 Scala 的 AWS Glue Spark 作业**

要在 Scala 中使用 AWS Glue Spark 作业类型，请选择 **Spark** 作为作业类型，选择**语言**作为 **Scala**。选择**作业启动时间缩短的 Spark 3.1、Scala 2（Glue 版本 3.0）**作为 AWS Glue 版本。为了节省存储空间，以下 Scala 示例中的 AWS Glue 还使用该 `applyMapping` 功能来转换数据类型。

*AWS Glue Scala 参数*

```
import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)
```

*使用 Scala 代码的 AWS Glue Spark 作业*

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._


object GlueScalaApp {
  def main(sysArgs: Array[String]) {
    
    @transient val spark: SparkContext = SparkContext.getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark)

    val inputLoc = "s3://bucket-name/prefix/sample_data.csv"
    val outputLoc = "s3://bucket-name/prefix/"

    val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()

    val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),
    ("_c2", "string", "profit", "double")), caseSensitive = false)

    val formatPartition = applyMapping.toDF().coalesce(1)

    val dynamicFrame = DynamicFrame(formatPartition, glueContext)

    val dataSink = glueContext.getSinkWithFormat(
        connectionType = "s3", 
        options = JsonOptions(Map("path" -> outputLoc )),
        transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)
  }
}
```

## 附件
<a name="attachments-8c926709-8fa4-417f-9aaf-bcc8113d018f"></a>

要访问与此文档相关联的其他内容，请解压以下文件：[attachment.zip](samples/p-attach/8c926709-8fa4-417f-9aaf-bcc8113d018f/attachments/attachment.zip)