

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Apache Spark 中的关键主题
<a name="key-topics-apache-spark"></a>

本节说明 Spark 基本概念和调优 AWS Glue for Apache Spark 性能的关键主题。在讨论现实世界调优策略之前，了解这些概念和主题非常重要。

## 架构
<a name="spark-architecture"></a>

Spark 驱动程序主要负责将您的 Spark 应用程序拆分为可在各个 Worker 完成的任务。Spark 驱动程序要承担以下职责：
+ 在您的代码中运行 `main()`
+ 生成执行计划
+ 与管理集群中资源的集群管理器协同预调配 Spark 执行程序
+ 为 Spark 执行程序安排任务和请求任务
+ 管理任务进度和恢复

您可以使用 `SparkContext` 对象与 Spark 驱动程序交互以运行作业。

Spark 执行程序是用于保存数据和运行从 Spark 驱动程序传递的任务的 Worker。Spark 执行程序的数量将随着集群的规模而增加和减少。



![\[Spark 驱动程序、集群管理器以及 Worker 节点与 Worker 节点中 JVM 执行程序的连接。\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/architecture-driver-cluster-worker.png)


**注意**  
Spark 执行程序有多个槽位，因此可以并行处理多项任务。默认情况下，Spark 支持每个虚拟 CPU（vCPU）核心执行一项任务。例如，如果执行程序有四个 CPU 核心，则它可以同时运行四个任务。

## 弹性分布式数据集
<a name="rdd"></a>

Spark 执行存储和跟踪跨 Spark 执行程序大型数据集的复杂工作。在为 Spark 作业编写代码时，您无需考虑存储的细节。Spark 提供*弹性分布式数据集（RDD）*抽象层，这是一组可以并行操作的元素，可以跨集群的 Spark 执行程序进行分区。

下图显示 Python 脚本在典型环境中和在 Spark 框架（*PySpark*）中运行时，数据在内存中存储方式的差异。



![\[Python val [1,2,3 N], Apache Spark rdd = sc.parallelize[1,2,3 N].\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/store-data-memory.png)

+ **Python**：在 Python 脚本中编写 `val = [1,2,3...N]` 可将数据保存在运行代码的单台计算机上的内存中。
+ **PySpark**：Spark 提供 RDD 数据结构，用于加载和处理跨多个 Spark 执行程序内存分布的数据。您可以使用 `rdd = sc.parallelize[1,2,3...N]` 等代码生成 RDD，而 Spark 可以跨多个 Spark 执行程序的内存自动分发和保留数据。

  在众多 AWS Glue 作业中，您可通过 AWS Glue *DynamicFrame* 和 Spark *DataFrame* 使用 RDD。这些抽象层允许您定义 RDD 中的数据架构，并使用这些附加信息执行更高层级的任务。由于其在内部使用 RDD，因此在以下代码中，数据会透明地分发并加载到多个节点：
  + DynamicFrame

    ```
    dyf= glueContext.create_dynamic_frame.from_options(
        's3', {"paths": [ "s3://<YourBucket>/<Prefix>/"]},
        format="parquet",
        transformation_ctx="dyf"
    )
    ```
  + DataFrame

    ```
    df = spark.read.format("parquet")
        .load("s3://<YourBucket>/<Prefix>")
    ```

RDD 具有以下功能：
+ RDD 由被划分为多个部分（称为*分区*）的数据组成。每个 Spark 执行程序都会在内存中存储一个或多个分区，数据跨多个执行程序分布。
+ RDD *不可变*，这意味着其在创建后无法更改。要更改 DataFrame，您可以使用下一节中定义的*转换*。
+ RDD 跨可用节点复制数据，因此其可以自动从节点故障中恢复。

### 惰性求值
<a name="lazy-eval"></a>

RDD 支持两种类型的运算：*转换*（从现有数据集创建新数据集）和*操作*（在对数据集运行计算后向驱动程序返回值）。
+ **转换**：由于 RDD *不可变*，因此只能通过使用转换进行更改。

  例如，`map` 是一种转换，它将每个数据集元素传递给一个函数，然后返回一个表示结果的新 RDD。请注意，`map` 方法不返回输出。Spark 存储抽象转换以供未来使用，而不是让您与结果进行交互。在您调用操作之前，Spark 不会对转换执行操作。
+ **操作**：使用转换，您可以制定逻辑转换计划。要启动计算，请运行 `write`、`count`、`show` 或 `collect` 等操作。

  Spark 中的所有转换都是*惰性的*，即不会立即计算结果。相反，Spark 会记住应用到 Amazon Simple Storage Service（Amazon S3）对象等某些基础数据集的一系列转换。仅当操作要求将结果返回给驱动程序时，才会计算转换。这种设计可以更高效地运行 Spark。例如，假设通过 `map` 转换创建的数据集仅被 `reduce` 等大幅减少行数的转换所使用。那么，您可以将经过这两次转换的较小数据集传递给驱动程序，而不是传递较大的映射数据集。

## Spark 应用程序的术语
<a name="terms"></a>

本节介绍 Spark 应用程序术语。Spark 驱动程序创建*执行计划*，并在多个抽象层中控制应用程序的行为。以下术语对于使用 Spark UI 进行开发、调试和性能调优非常重要。
+ ***应用程序***：基于 Spark 会话（Spark 上下文）。通过 `<application_XXX>` 等唯一 ID 表示。
+ ***作业***：基于为 RDD** **创建****的操作。作业包含一个或多个*阶段*。
+ ***阶段***：基于为 RDD** **创建****的*随机排序*。阶段包含一个或多个任务。随机排序是 Spark 的机制，用于重新分配数据以跨 RDD 分区对数据进行不同的分组。`join()` 等某些转换需要随机排序。在[优化随机排序](optimize-shuffles.md)调优实践中更详细地介绍了随机排序。
+ ***任务***：任务是 Spark 计划的最小处理单位。为每个 RDD 分区创建任务，任务数是阶段中可同时执行的最大数量。



![\[包含作业、阶段、随机排序和任务的执行计划。\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/spark-execution-plan.png)


**注意**  
任务是优化并行度时需要考虑的最重要因素。任务数量与 RDD 数量成正比

### 并行
<a name="parallelism"></a>

Spark 可以并行处理加载和转换数据的任务。

例如，假设您需要对 Amazon S3 上的访问日志文件（名为 `accesslog1 ... accesslogN`）进行分布式处理。下图显示分布式处理流程。

![\[""\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/distributed-processing-flow.png)


1. Spark 驱动程序创建执行计划，用于跨众多 Spark 执行程序的分布式处理。

1. Spark 驱动程序根据执行计划为每个执行程序分配任务。默认情况下，Spark 驱动程序为每个 S3 对象（`Part1 ... N`）创建 RDD 分区（每个分区对应一个 Spark 任务）。然后，Spark 驱动程序会将任务分配给每个执行程序。

1. 每个 Spark 任务都会下载其分配的 S3 对象，并将其存储在 RDD 分区的内存中。这样，多个 Spark 执行程序就可以并行下载和处理其分配的任务。

有关初始分区数量和优化的更多详细信息，请参阅[并行处理任务](parallelize-tasks.md)一节。

### Catalyst 优化器
<a name="catalyst-optimizer"></a>

Spark 在内部使用名为 [Catalyst 优化器](https://www.databricks.com/glossary/catalyst-optimizer)的引擎来优化执行计划。Catalyst 有一个查询优化器，您可以在运行 [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#sql)、[DataFrame 和数据集](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)等高级 Spark API 时使用该优化器，如下图所示。



![\[逻辑计划会经过 Catalyst 优化器，后者输出优化的计划发送给 RDD。\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/catalyst-optimizer.png)


由于 Catalyst 优化器不能直接与 RDD API 配合使用，因此高级 API 通常比低级 RDD API 更快。对于复杂联接，Catalyst 优化器可以通过优化作业运行计划来显著提升性能。您可以在 Spark UI 的 **SQL** 选项卡上查看 Spark 作业的优化计划。

*自适应查询执行*

Catalyst 优化器通过名为*自适应查询执行*的进程来执行运行时优化。自适应查询执行使用运行时统计信息在作业运行时重新优化查询的运行计划。自适应查询执行为性能挑战提供了多种解决方案，包括合并随机排序后的分区、将排序合并联接转换为广播联接以及偏斜联接优化，如以下各节所述。

自适应查询执行在 AWS Glue 3.0 及更高版本中可用，在 AWS Glue 4.0（Spark 3.3.0）及更高版本中默认启用。可以在代码中使用 `spark.conf.set("spark.sql.adaptive.enabled", "true")` 开启和关闭自适应查询执行。

*合并随机排序后的分区*

此功能根据 `map` 输出统计信息在每次随机排序后减少 RDD 分区（合并）。它简化了运行查询时对随机排序分区编号的调优。您无需设置适合数据集的随机排序分区编号。在您有足够大的初始随机排序分区编号之后，Spark 可以在运行时挑选适当的随机排序分区编号。

当 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 均设置为 true 时，将启用合并随机排序后的分区。有关更多信息，请参阅 [Apache Spark 文档](https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalescing-post-shuffle-partitions)。

*将排序合并联接转换为广播联接*

此功能可识别您何时连接两个规模截然不同的数据集，并根据该信息采用更高效的联接算法。有关更多详细信息，请参阅 [Apache Spark 文档](https://spark.apache.org/docs/latest/sql-performance-tuning.html#converting-sort-merge-join-to-broadcast-join)。联接策略在[优化随机排序](optimize-shuffles.md)一节中介绍。

*偏斜联接优化*

数据偏斜是 Spark 作业最常见的瓶颈之一。它描述了一种情况，即数据偏向特定 RDD 分区（以及因此产生的特定任务），这会延迟应用程序的总处理时间。这通常会降低联接操作的性能。偏斜联接优化功能通过将偏斜的任务拆分（并在需要时复制）为大小大致相等的任务，动态处理排序合并联接中的偏斜。

此功能在 `spark.sql.adaptive.skewJoin.enabled` 设置为 true 时启用。有关更多详细信息，请参阅 [Apache Spark 文档](https://spark.apache.org/docs/latest/sql-performance-tuning.html#optimizing-skew-join)。数据偏斜将在[优化随机排序](optimize-shuffles.md)一节中进一步讨论。