

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 优化 Spark 性能
<a name="emr-spark-performance"></a>

Amazon EMR 为 Spark 提供多项性能优化功能。本主题详细介绍了各个优化功能。

有关如何设置 Spark 配置的更多信息，请参阅 [配置 Spark](emr-spark-configure.md)。

## 自适应查询执行
<a name="emr-spark-performance-aqe"></a>

自适应查询执行是一个根据运行时统计信息重新优化查询计划的框架。自 Amazon EMR 5.30.0 起，以下来自 Apache Spark 3 的自适应查询执行优化可用于 Spark 2 的 Apache Amazon EMR 运行时系统。
+ 自适应连接转换
+ 随机分区的自适应合并

**自适应连接转换**

自适应联接转换可根据查询阶段的运行时大小将 broadcast-hash-joins操作转换为 sort-merge-join操作，从而提高查询性能。 Broadcast-hash-joins当联接的一侧足够小，可以有效地向所有执行者广播其输出时，性能往往会更好，从而无需对联接的两边进行随机交换和排序。自适应联接转换扩大了 Spark 自动执行 broadcast-hash-joins的情况范围。

该功能已默认启用。可以通过将 `spark.sql.adaptive.enabled` 设置为 `false` 来禁用它，同时会禁用自适应查询执行框架。 broadcast-hash-join当其中一个 sort-merge-join联接方的运行时大小统计数据不超过（默认为 10,485,760 字节 (10 MiB)）时`spark.sql.autoBroadcastJoinThreshold`，Spark 决定将 a 转换为 a。

**随机分区的自适应合并**

随机分区的自适应合并通过合并小的连续随机分区来避免产生太多小任务的开销，从而提高查询性能。这样，您就可以预先配置更多的初始随机分区，然后在运行时将其减少到目标大小，从而提高拥有更均匀分配的随机分区的可能性。

此功能默认情况下已启用，除非 `spark.sql.shuffle.partitions` 采用显式设置。可以通过将 `spark.sql.adaptive.coalescePartitions.enabled` 设置为 `true` 来启用它。初始数量的随机分区和目标分区大小都可以分别使用 `spark.sql.adaptive.coalescePartitions.minPartitionNum` 和 `spark.sql.adaptive.advisoryPartitionSizeInBytes` 属性进行优化。有关此功能的相关 Spark 属性的详细信息，请参阅下表。


**Spark 自适应合并分区属性**  

| 属性 | 默认 值 | 说明 | 
| --- | --- | --- | 
|  `spark.sql.adaptive.coalescePartitions.enabled`  |  true，除非 `spark.sql.shuffle.partitions` 为显式设置  |  如果为 true 且 spark.sql.adaptive.enabled 为 true，则 Spark 将根据目标大小合并连续的随机分区（通过 `spark.sql.adaptive.advisoryPartitionSizeInBytes` 指定），以避免过多的小任务。  | 
|  `spark.sql.adaptive.advisoryPartitionSizeInBytes`  | 64MB |  合并时，随机分区q的指导大小（按字节计算）。此配置仅在 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 两者都为 `true` 时才有效。  | 
|  `spark.sql.adaptive.coalescePartitions.minPartitionNum`  | 25 |  合并后的最小随机分区数。此配置仅在 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 两者都为 `true` 时才有效。  | 
|  `spark.sql.adaptive.coalescePartitions.initialPartitionNum`  | 1000 |  合并前的随机分区的初始数量。此配置仅在 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 两者都为 `true` 时才有效。  | 

## 动态分区修剪
<a name="emr-spark-performance-dynamic"></a>

动态分区修剪通过针对特定的查询更准确地选择表中需要读取和处理的特定分区来提高作业性能。通过减少读取和处理的数据量，可节省大量的作业执行时间。对于 Amazon EMR 5.26.0，此功能已默认启用。对于 Amazon EMR 5.24.0 和 5.25.0，您可以在 Spark 中或在创建集群时，通过设置 Spark 属性 `spark.sql.dynamicPartitionPruning.enabled` 来启用此功能。


**Spark 动态分区修剪分区属性**  

| 属性 | 默认值 | 说明 | 
| --- | --- | --- | 
|  `spark.sql.dynamicPartitionPruning.enabled`  |  `true`  |  如果为 true，则启用动态分区修剪。  | 
|  `spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse`  |  `true`  |  为 `true` 时，Spark 会在查询执行之前执行防御性检查，以确保动态修剪筛选条件中广播交换的重复使用不会被以后的准备规则（如用户定义的列式规则）中断。当重用被中断且此配置是 `true` 时，Spark 会删除受影响的动态修剪筛选条件，以防止发生性能和正确性问题。当动态修剪筛选条件的广播交换从相应连接操作的广播交换产生不同且不一致的结果时，可能会出现正确性问题。将此配置设置为 `false` 应谨慎执行；它允许解决如下类似场景：当用户定义的列式规则中断重用时。启用“自适应查询执行”后，将始终强制执行广播重用。  | 

这种优化功能在 Spark 2.4.2 的现有功能基础之上进行改进，只支持向下推送可以在计划时解析的静态谓词。

以下是 Spark 2.4.2 中静态谓词向下推送的示例。

```
partition_col = 5

partition_col IN (1,3,5)

partition_col between 1 and 3

partition_col = 1 + 3
```

动态分区修剪允许 Spark 引擎在运行时动态地推断哪些分区需要读取，哪些分区可以安全地消除。例如，以下查询涉及两个表：`store_sales` 表，其中包含所有店铺的全部总销售额（按区域分区）；以及 `store_regions` 表，其中包含每个国家/地区的区域映射。这些表包含有关分布于全球的存储的数据，但我们只查询北美的数据。

```
select ss.quarter, ss.region, ss.store, ss.total_sales 
from store_sales ss, store_regions sr
where ss.region = sr.region and sr.country = 'North America'
```

如果没有动态分区修剪，此查询将读取所有区域，然后过滤出与子查询的结果匹配的区域子集。使用动态分区修剪，此查询将只读取和处理子查询中返回的区域的分区。这样，通过减少数据存储和处理较少的记录，节省了时间和资源。

## 展平标量子查询
<a name="emr-spark-performance-flatten"></a>

这种优化功能通过对同一个表执行标量子查询来提高查询的性能。对于 Amazon EMR 5.26.0，此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0，您可以在 Spark 中或在创建集群时，通过设置 Spark 属性 `spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled` 来启用此功能。当此属性设置为 true 时，查询优化程序会展平使用相同关系的聚合标量子查询（如果可能）。标量子查询通过以下方法展平：将子查询中存在的任何谓词推送到聚合函数，然后执行一个聚合（针对所有聚合函数，按每个关系）。

以下示例是一个将受益于此优化的查询示例。

```
select (select avg(age) from students                    /* Subquery 1 */
                 where age between 5 and 10) as group1,
       (select avg(age) from students                    /* Subquery 2 */
                 where age between 10 and 15) as group2,
       (select avg(age) from students                    /* Subquery 3 */
                 where age between 15 and 20) as group3
```

此优化将之前的查询重写为：

```
select c1 as group1, c2 as group2, c3 as group3
from (select avg (if(age between 5 and 10, age, null)) as c1,
             avg (if(age between 10 and 15, age, null)) as c2,
             avg (if(age between 15 and 20, age, null)) as c3 from students);
```

请注意，重写的查询将只读取一次 student 表，而三个子查询的谓词将推送到 `avg` 函数中。

## DISTINCT Before INTERSECT
<a name="emr-spark-performance-distinct"></a>

这种优化可优化使用 INTERSECT 时的联接。对于 Amazon EMR 5.26.0，此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0，您可以在 Spark 中或在创建集群时，通过设置 Spark 属性 `spark.sql.optimizer.distinctBeforeIntersect.enabled` 来启用此功能。使用 INTERSECT 的查询会自动转换为使用左半联接。当此属性设置为 true 时，如果查询优化器检测到 DISTINCT 运算符可以将左半联接变为 a BroadcastHashJoin 而不是 a，则查询优化器会将 DISTINCT 运算符推送给 INTERSECT 的子级。 SortMergeJoin

以下示例是一个将受益于此优化的查询示例。

```
(select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
intersect
(select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
```

如果没有启用此属性 `spark.sql.optimizer.distinctBeforeIntersect.enabled`，则查询将被重写，如下所示。

```
select distinct brand from
  (select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

当您启用此属性 `spark.sql.optimizer.distinctBeforeIntersect.enabled` 时，查询将被重写，如下所示。

```
select brand from
  (select distinct item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select distinct item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

## Bloom 筛选条件连接
<a name="emr-spark-performance-bloom"></a>

这种优化可以通过使用从联接另一端的值生成的 [Bloom 筛选条件](https://en.wikipedia.org/wiki/Bloom_filter)对联接的一端进行预筛选，来提高部分联接的性能。对于 Amazon EMR 5.26.0，此功能已默认启用。借助 Amazon EMR 5.25.0，您可以在 Spark 中或在创建集群时，通过将 Spark 属性 `spark.sql.bloomFilterJoin.enabled` 设置为 `true` 来启用此功能。

下面是一个可以受益于 Bloom 筛选条件的示例查询。

```
select count(*)
from sales, item
where sales.item_id = item.id
and item.category in (1, 10, 16)
```

启用此功能后，Bloom 筛选条件将根据所有类别位于要查询的类别集中的项目ID构建。扫描销售表时，Bloom 筛选条件用于确定哪些销售属于肯定不在 Bloom 筛选条件定义的集中的项目。借此，可以尽早筛选出这些被标识的销售。

## 优化的连接重新排序
<a name="emr-spark-performance-join-reorder"></a>

这项优化通过将涉及带筛选条件的表的联接进行重新排序来提高查询性能。对于 Amazon EMR 5.26.0，此功能已默认启用。对于 Amazon EMR 5.25.0，您可以通过将 Spark 配置参数 `spark.sql.optimizer.sizeBasedJoinReorder.enabled` 设置为 true 来启用此功能。Spark 的默认行为是从左到右联接表，如查询中所列。此策略可能会错过首先使用筛选条件执行较小联接的机会，以便之后利用更昂贵的联接。

下面的示例查询报告了一个国家/地区所有商店的所有退回商品。如果不经过优化的联接重新排序，Spark 首先会联接两个大型表 `store_sales` 和 `store_returns`，然后将其与 `store` 联接，最终再联接 `item`。

```
select ss.item_value, sr.return_date, s.name, i.desc, 
from store_sales ss, store_returns sr, store s, item i
where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id
and s.country = 'USA'
```

经过优化的联接重新排序，Spark 首先会联接 `store_sales` 与 `store`，因为 `store` 有一个筛选条件并且小于 `store_returns` 和 `broadcastable`。然后，Spark 会联接 `store_returns`，最后联接 `item`。如果 `item` 有一个筛选条件并且可广播，则其也符合重新排序的条件，这会使 `store_sales` 与 `store` 联接，之后联接 `item`，并在最后联接 `store_returns`。