

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Apache Spark 中的關鍵主題
<a name="key-topics-apache-spark"></a>

本節說明用於調校 Apache Spark 效能 AWS Glue 的 Apache Spark 基本概念和關鍵主題。在討論真實世界調校策略之前，請務必了解這些概念和主題。

## 架構
<a name="spark-architecture"></a>

Spark 驅動程式主要負責將您的 Spark 應用程式分割為可在個別工作者上完成的任務。Spark 驅動程式有下列責任：
+ 在程式碼`main()`中執行
+ 產生執行計畫
+ 搭配叢集管理員佈建 Spark 執行器，以管理叢集上的資源
+ 排程任務並請求 Spark 執行器的任務
+ 管理任務進度和復原

您可以使用 `SparkContext` 物件來與 Spark 驅動程式互動，以進行任務執行。

Spark 執行器是一種工作者，用於保存從 Spark 驅動程式傳遞的資料和執行中的任務。Spark 執行器的數量會隨著叢集的大小而增加和減少。



![Spark 驅動程式、叢集管理員和工作者節點與工作者節點中 JVM 執行器的連線。](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/architecture-driver-cluster-worker.png)


**注意**  
Spark 執行器具有多個槽，以便平行處理多個任務。根據預設，Spark 支援每個虛擬 CPU (vCPU) 核心一個任務。例如，如果執行器有四個 CPU 核心，則可以執行四個並行任務。

## 彈性分散式資料集
<a name="rdd"></a>

Spark 會執行複雜任務，以跨 Spark 執行器存放和追蹤大型資料集。當您編寫 Spark 任務的程式碼時，您不需要考慮儲存體的詳細資訊。Spark 提供*彈性分散式資料集 (RDD)* 抽象，這是可以平行操作的元素集合，並可分割叢集的 Spark 執行器。

下圖顯示 Python 指令碼在一般環境中執行時，以及在 Spark 架構 (*PySpark*) 中執行時，如何在記憶體中存放資料的差異。



![Python val 【1，2，3 N】、Apache Spark rdd = sc.parallelize【1，2，3 N】。](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/store-data-memory.png)

+ **Python** – 在 Python 指令碼`val = [1,2,3...N]`中寫入 會將資料保留在執行程式碼的單一電腦上的記憶體中。
+ **PySpark** – Spark 提供 RDD 資料結構，以載入和處理分散在多個 Spark 執行器上的記憶體的資料。您可以使用 等程式碼產生 RDD`rdd = sc.parallelize[1,2,3...N]`，Spark 可以在多個 Spark 執行器之間自動分配和保留記憶體中的資料。

  在許多 AWS Glue 任務中，您可以透過 *DynamicFrames* 和 Spark *DataFrames* 使用 RDDs AWS Glue 。這些摘要可讓您定義 RDD 中的資料結構描述，並使用該額外資訊執行更高階的任務。由於它們在內部使用 RDDs，因此資料會以透明的方式分佈並載入到下列程式碼中的多個節點：
  + DynamicFrame

    ```
    dyf= glueContext.create_dynamic_frame.from_options(
        's3', {"paths": [ "s3://<YourBucket>/<Prefix>/"]},
        format="parquet",
        transformation_ctx="dyf"
    )
    ```
  + DataFrame

    ```
    df = spark.read.format("parquet")
        .load("s3://<YourBucket>/<Prefix>")
    ```

RDD 具有下列功能：
+ RDDs由分成多個部分的資料組成，稱為*分割區*。每個 Spark 執行器都會在記憶體中存放一或多個分割區，而資料會分散到多個執行器。
+ RDDs 是*不可變的*，這表示它們在建立之後無法變更。若要變更 DataFrame，您可以使用*轉換*，如下節所定義。
+ RDDs會跨可用節點複寫資料，以便從節點故障中自動復原。

### 延遲評估
<a name="lazy-eval"></a>

RDDs支援兩種類型的操作：從現有資料集建立新資料集*的轉換*，以及在資料集上執行運算後將值傳回驅動程式的 *動作*。
+ **轉換** – 由於 RDDs *不可變，*因此您只能使用轉換來變更它們。

  例如， `map` 是一種轉換，透過 函數傳遞每個資料集元素，並傳回代表結果的新 RDD。請注意， `map`方法不會傳回輸出。Spark 會存放未來的抽象轉換，而不是讓您與結果互動。在您呼叫 動作之前，Spark 不會對轉換採取行動。
+ **動作** – 使用轉換，您可以建立邏輯轉換計畫。若要啟動運算，請執行 動作，例如 `write`、`show`、 `count`或 `collect`。

  Spark 中的所有轉換都是*延遲*的，因為它們不會立即運算其結果。反之，Spark 會記住套用至某些基本資料集的一系列轉換，例如 Amazon Simple Storage Service (Amazon S3) 物件。只有在動作需要將結果傳回給驅動程式時，才會計算轉換。此設計可讓 Spark 更有效率地執行。例如，假設透過`map`轉換建立的資料集僅由大幅減少資料列數量的轉換耗用，例如 `reduce`。然後，您可以將經過兩個轉換的較小資料集傳遞給驅動程式，而不是傳遞較大的映射資料集。

## Spark 應用程式術語
<a name="terms"></a>

本節涵蓋 Spark 應用程式術語。Spark 驅動程式會建立*執行計畫*，並以數個抽象概念控制應用程式的行為。下列術語對於使用 Spark UI 進行開發、偵錯和效能調校非常重要。
+ ***應用程式*** – 根據 Spark 工作階段 (Spark 內容）。由唯一 ID 識別，例如 `<application_XXX>`。
+ ***任務*** – 根據** **為 RDD ** **建立的動作。任務包含一或多個*階段*。
+ ***階段*** – 根據** **為 RDD 建立的*隨機播放*** **。階段包含一或多個任務。隨機播放是 Spark 重新分配資料的機制，因此會在 RDD 分割區之間以不同的方式分組。某些轉換，例如 `join()`，需要隨機播放。最佳化隨機播放調校實務中[會更詳細地討論隨機播放](optimize-shuffles.md)。
+ ***任務*** – 任務是 Spark 排定的最小處理單位。系統會為每個 RDD 分割區建立任務，而任務數量是階段中同時執行的最大數量。



![具有任務、階段、隨機播放和任務的執行計畫。](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/spark-execution-plan.png)


**注意**  
最佳化平行處理時，任務是最重要的考量事項。任務數量隨 RDD 數量而擴展

### 平行處理
<a name="parallelism"></a>

Spark 會平行處理載入和轉換資料的任務。

請考慮您在 Amazon S3 上執行存取日誌檔案 （名為 `accesslog1 ... accesslogN`) 分散式處理的範例。下圖顯示分散式處理流程。

![""](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/distributed-processing-flow.png)


1. Spark 驅動程式會建立執行計畫，以便在多個 Spark 執行器之間進行分散式處理。

1. Spark 驅動程式會根據執行計畫指派每個執行器的任務。根據預設，Spark 驅動程式會為每個 S3 物件 () 建立 RDD 分割區 （每個分割區對應於 Spark 任務）`Part1 ... N`。然後，Spark 驅動程式會將任務指派給每個執行器。

1. 每個 Spark 任務都會下載其指派的 S3 物件，並將其存放在 RDD 分割區的記憶體中。如此一來，多個 Spark 執行器會平行下載和處理其指派的任務。

如需分割區初始數量和最佳化的詳細資訊，請參閱[平行處理任務](parallelize-tasks.md)一節。

### Catalyst 最佳化工具
<a name="catalyst-optimizer"></a>

在內部，Spark 使用稱為 [Catalyst 最佳化工具](https://www.databricks.com/glossary/catalyst-optimizer)的引擎來最佳化執行計畫。Catalyst 具有查詢最佳化工具，您可以在執行高階 Spark APIs 時使用，例如 [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#sql)[、DataFrame 和資料集](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)，如下圖所述。



![邏輯計劃採用 throuh Catalyst 最佳化工具，這會輸出傳送到 RDDs最佳化計劃。](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/catalyst-optimizer.png)


由於 Catalyst 最佳化工具無法直接與 RDD API 搭配使用，因此高階 APIs 通常比低階 RDD API 更快。對於複雜的聯結，Catalyst 最佳化工具可以透過最佳化任務執行計畫來大幅改善效能。您可以在 Spark UI 的 **SQL** 索引標籤上查看 Spark 任務的最佳化計劃。

*自適應查詢執行*

Catalyst 最佳化工具會透過稱為*自適應查詢執行*的程序來執行執行期最佳化。自適應查詢執行會使用執行時間統計資料，在任務執行時重新最佳化查詢的執行計畫。自適應查詢執行提供數種效能挑戰的解決方案，包括合併隨機播放後分割區、將排序合併聯結轉換為廣播聯結，以及扭曲聯結最佳化，如下列各節所述。

自適應查詢執行可在 AWS Glue 3.0 和更新版本中使用，且預設會在 AWS Glue 4.0 (Spark 3.3.0) 和更新版本中啟用。您可以在程式碼`spark.conf.set("spark.sql.adaptive.enabled", "true")`中使用 來開啟和關閉自適應查詢執行。

*Coalescing 隨機播放後分割區*

此功能會根據`map`輸出統計資料，減少每次隨機播放後的 RDD 分割區 （餘音）。其可簡化執行查詢時隨機播放分割區編號的調校。您不需要設定隨機分割號碼以符合您的資料集。在您擁有足夠多的隨機播放分割區初始數量之後，Spark 可以在執行時間選擇適當的隨機播放分割區編號。

當 `spark.sql.adaptive.enabled`和 都設為 true 時`spark.sql.adaptive.coalescePartitions.enabled`，會啟用並行隨機播放後分割區。如需詳細資訊，請參閱 [Apache Spark 文件](https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalescing-post-shuffle-partitions)。

*將排序合併聯結轉換為廣播聯結*

此功能會辨識您何時聯結兩個大小大致不同的資料集，並根據該資訊採用更有效率的聯結演算法。如需詳細資訊，請參閱 [Apache Spark 文件](https://spark.apache.org/docs/latest/sql-performance-tuning.html#converting-sort-merge-join-to-broadcast-join)。[最佳化隨機播放](optimize-shuffles.md)區段會討論聯結策略。

*扭曲聯結最佳化*

資料扭曲是 Spark 任務最常見的瓶頸之一。它描述了資料偏向特定 RDD 分割區 （因此是特定任務） 的情況，這會延遲應用程式的整體處理時間。這通常會降低聯結操作的效能。扭曲聯結最佳化功能會動態處理排序合併聯結中的扭曲，方法是將扭曲的任務分割 （並視需要複寫） 為大致均勻大小的任務。

當 設為 true `spark.sql.adaptive.skewJoin.enabled` 時，會啟用此功能。如需詳細資訊，請參閱 [Apache Spark 文件](https://spark.apache.org/docs/latest/sql-performance-tuning.html#optimizing-skew-join)。[最佳化隨機播放](optimize-shuffles.md)區段會進一步討論資料扭曲。