

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon S3 提升 Spark 效能
<a name="emr-spark-s3-performance"></a>

Amazon EMR 提供的功能，可在使用 Spark 查詢、讀取和寫入儲存於 Amazon S3 的資料時協助使效能優化。

[S3 Select](https://aws.amazon.com/blogs/aws/s3-glacier-select/) 可藉由將處理「下推」至 Amazon S3 改善某些應用程式中 CSV 和 JSON 檔案的查詢效能。

EMRFS S3-optimized遞交者是 [OutputCommitter](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html) 類別的替代方案，其使用 EMRFS 的分段上傳功能來改善使用 Spark、DataFrames 和資料集將 Parquet 檔案寫入 Amazon S3 時的效能。

**Topics**
+ [使用 S3 Select 搭配 Spark 以提升查詢效能](emr-spark-s3select.md)
+ [EMR Spark MagicCommitProtocol](emr-spark-magic-commit-protocol.md)
+ [使用 EMRFS S3 優化遞交者](emr-spark-s3-optimized-committer.md)
+ [使用 EMRFS S3 優化遞交通訊協定](emr-spark-s3-optimized-commit-protocol.md)
+ [使用 EMRFS 的重試 Amazon S3 請求](emr-spark-emrfs-retry.md)

# 使用 S3 Select 搭配 Spark 以提升查詢效能
<a name="emr-spark-s3select"></a>

**重要**  
Amazon S3 Select 不再提供給新客戶。Amazon S3 Select 的現有客戶可以繼續照常使用此功能。[進一步了解](https://aws.amazon.com/blogs/storage/how-to-optimize-querying-your-data-in-amazon-s3/) 

在 Amazon EMR 5.17.0 版及更高版本中，您可以將 [S3 Select](https://aws.amazon.com/blogs/aws/s3-glacier-select/) 與 Amazon EMR 上的 Spark 搭配使用。*S3 Select* 可讓應用程式從物件只擷取資料子集。針對 Amazon EMR，篩選大型資料集以進行處理的運算工作，會從叢集「下推」到 Amazon S3 處理，因而在某些應用程式中可改善效能，並減少 Amazon EMR 和 Amazon S3 之間傳輸的資料量。

S3 Select 支援使用 `s3selectCSV` 和 `s3selectJSON` 值指定資料格式的 CSV 和 JSON 檔案。如需詳細資訊和範例，請參閱 [在您的程式碼中指定 S3 Select](#emr-spark-s3select-specify)。

## S3 Select 是否適合我的應用程式？
<a name="emr-spark-s3select-apps"></a>

建議您在使用和不使用 S3 Select 的狀態下，對應用程式進行基準分析，以確認其是否適合您的應用程式。

利用下列的準則，來判斷您的應用程式是否可能使用 S3 Select：
+ 您的查詢會篩選掉原始資料集一半以上的資料。
+ Amazon S3 與 Amazon EMR 叢集之間的網路連線具有良好的傳輸速度和可用頻寬。Amazon S3 不會壓縮 HTTP 回應，因此所壓縮輸入檔案的回應大小可能會增加。

## 考量和限制
<a name="emr-spark-s3select-considerations"></a>
+ 不支援使用客戶所提供加密金鑰 (SSE-C) 的 Amazon S3 伺服器端加密，也不支援用戶端加密。
+ 不支援 `AllowQuotedRecordDelimiters` 屬性。如果指定此屬性，查詢會失敗。
+ 僅支援採用 UTF-8 格式的 CSV 和 JSON 檔案。不支援多行 CSV。
+ 僅支援未壓縮的檔案或 gzip 檔案。
+ 不支援 Spark CSV 和 JSON 選項 (例如 `nanValue`、`positiveInf`、`negativeInf`) 以及與損毀記錄有關的選項 (例如 failfast 和 dropmalformed 模式)。
+ 不支援在十進位使用逗號 (,)。例如，不支援 `10,000` 但支援 `10000`。
+ 不支援在最後一行的註解字元。
+ 不會處理檔案尾端的空白行。
+ 以下篩選條件不會下推至 Amazon S3：
  + 彙總函數，例如 `COUNT()` 和 `SUM()`。
  + `CAST()` 屬性的篩選條件。例如 `CAST(stringColumn as INT) = 1`。
  + 含屬性的篩選條件為物件或很複雜。例如 `intArray[1] = 1, objectColumn.objectNumber = 1`。
  + 篩選條件的值不是常值。例如 `intColumn1 = intColumn2`
  + 僅在載明限制的情況下支援 [S3 Select 支援的資料類型](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-data-types.html)。

## 在您的程式碼中指定 S3 Select
<a name="emr-spark-s3select-specify"></a>

以下範例示範如何使用 Scala、SQL、R 和 PySpark 指定適用於 CSV 的 S3 Select。您可透過相同方式使用適用於 JSON 的 S3 Select。如需了解選項清單、其預設值和限制，請參閱 [選項](#emr-spark-s3select-specify-options)。

------
#### [ PySpark ]

```
spark
  .read
  .format("s3selectCSV") // "s3selectJson" for Json
  .schema(...) // optional, but recommended
  .options(...) // optional
  .load("s3://path/to/my/datafiles")
```

------
#### [ R ]

```
read.df("s3://path/to/my/datafiles", "s3selectCSV", schema, header = "true", delimiter = "\t")
```

------
#### [ Scala ]

```
spark
  .read
  .format("s3selectCSV") // "s3selectJson" for Json
  .schema(...) // optional, but recommended
  .options(...) // optional. Examples:  
  // .options(Map("quote" -> "\'", "header" -> "true")) or
  // .option("quote", "\'").option("header", "true")
  .load("s3://path/to/my/datafiles")
```

------
#### [ SQL ]

```
CREATE TEMPORARY VIEW MyView (number INT, name STRING) USING s3selectCSV OPTIONS (path "s3://path/to/my/datafiles", header "true", delimiter "\t")
```

------

### 選項
<a name="emr-spark-s3select-specify-options"></a>

使用 `s3selectCSV` 和 `s3selectJSON` 時，有以下可用選項。若未指定，則會使用預設值。

#### S3selectCSV 的選項
<a name="emr-spark-s3select-specify-options-csv"></a>


| 選項 | 預設 | Usage | 
| --- | --- | --- | 
|  `compression`  |  `"none"`  |  指出是否使用了壓縮。`"gzip"` 是除了 `"none"` 之外唯一支援的設定。  | 
|  `delimiter`  |  ","  |  指定欄位分隔符號。  | 
|  `quote`  |  `'\"'`  |  指定引號字元。不支援指定空白字串，這會導致格式不正確的 XML 錯誤。  | 
|  `escape`  |  `'\\'`  |  指定逸出字元。  | 
|  `header`  |  `"false"`  |  `"false"` 指定沒有標頭。`"true"` 指定標頭位於第一行。僅支援標頭位於第一行，不支援在標頭前有空白行。  | 
|  comment  |  `"#"`  |  指定註解字元。註解指標無法停用。換言之，不支援 `\u0000` 的值。  | 
|  `nullValue`  |  ""  |    | 

#### S3selectJSON 的選項
<a name="emr-spark-s3select-specify-options-json"></a>


| 選項 | 預設 | Usage | 
| --- | --- | --- | 
|  `compression`  |  `"none"`  |  指出是否使用了壓縮。`"gzip"` 是除了 `"none"` 之外唯一支援的設定。  | 
|  `multiline`  |  "false"  |  `"false"` 指定 JSON 為 S3 Select `LINES` 格式，這表示輸入資料中的每一行都包含了單一 JSON 物件。`"true"` 指定 JSON 為 S3 Select `DOCUMENT` 格式，這表示一個 JSON 物件可在輸入資料中跨越多行。  | 

# EMR Spark MagicCommitProtocol
<a name="emr-spark-magic-commit-protocol"></a>

從 EMR 6.15.0 之後，使用 S3A 檔案系統時，MagicCommitProtocol 會成為 Spark 的預設 FileCommitProtocol。

## MagicCommitProtocol
<a name="magic-commit-protocol"></a>

MagicCommitProtocol 是 [FileCommitProtocol](https://dlcdn.apache.org/spark/docs/2.4.2/api/java/org/apache/spark/internal/io/FileCommitProtocol.html) 的替代實作，已針對使用 Amazon S3 S3A 進行最佳化。此通訊協定旨在透過避免在任務和任務遞交階段期間在 Amazon S3 中使用重新命名操作來改善應用程式效能。

MagicCommitProtocol 是使用 S3A 檔案系統時，在 Amazon Elastic Map Reduce (EMR) 上執行的 Spark 使用的預設 FileCommitProtocol 實作。MagicCommitProtocol 在內部使用 [MagicV2Committer ](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/s3a-magicv2-committer.html) 執行檔案寫入 Amazon S3。

對於靜態插入操作，MagicCommitProtocol 在任務遞交階段將檔案寫入任務的輸出位置。相反地，對於動態插入覆寫操作，任務嘗試寫入的檔案只會在任務遞交時出現在任務的輸出位置。這可透過在任務遞交呼叫時將遞交中繼資料匯出回 Spark 驅動程式來實現。

## 啟用 MagicCommitProtocol
<a name="enabling-magic-commit-protocol"></a>

使用 S3A 檔案系統時，在 Amazon Elastic Map Reduce (EMR) 上執行的 Spark 預設會啟用 MagicCommitProtocol。

若要使用 S3A 檔案系統，您可以：

1. 使用定義資料表、分割區或目錄`s3a://`時所用的檔案配置。

1. 在 core-site.xml `fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem`中設定組態。

## 停用 MagicCommitProtocol
<a name="disabling-magic-commit-protocol"></a>

1. 您可以透過在 中對其進行硬式編碼`SparkConf`、在 Spark shell 或 `spark-submit`和 `spark-sql`工具中將其作為`--conf`參數傳遞，或在 中將其`spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.leverageMagicCommitProtocol`設定為 false`conf/spark-defaults.conf`。如需詳細資訊，請參閱 Apache [Spark 文件中的 Spark 組態](https://spark.apache.org/docs/latest/configuration.html)。

   下列範例示範如何在執行`spark-sql`命令時停用 MagicCommitProtocol。

   ```
   spark-sql \
     --conf spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.leverageMagicCommitProtocol=false \
   -e "INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;"
   ```

1. 使用`spark-defaults`組態分類將 `spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.leverageMagicCommitProtocol` 屬性設定為 false。如需詳細資訊，請參閱[設定應用程式](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html)。

## MagicCommitProtocol 考量事項
<a name="magic-commit-considerations"></a>
+ 對於靜態分割區插入，在 Spark 執行器上，MagicCommitProtocol 會對任務嘗試寫入的每個檔案耗用少量記憶體，直到任務遞交或中止為止。在大多數任務中，記憶體的消耗量極少。Spark 驅動程式沒有額外的記憶體需求
+ 對於動態分割區插入，在 Spark 驅動程式上，MagicCommitProtocol 需要記憶體來存放每個遞交檔案的中繼資料資訊，直到任務遞交或中止為止。在大多數作業中，會忽略預設 Spark 驅動程式記憶體設定。

  對於具有寫入大量檔案之長時間執行任務的作業，遞交通訊協定耗用的記憶體量可能會很明顯，並需要調整配置給 Spark，特別是 Spark 執行器的記憶體。您可以使用 `spark.driver.memory` 屬性調整 Spark 驅動程式的記憶體，或使用 `spark.executor.memory` 屬性調整 Spark 執行器的記憶體。作為準則，撰寫 100，000 個檔案的單一任務通常需要額外的 200MB 記憶體。如需詳細資訊，請參閱 Apache Spark 組態文件中的[應用程式屬性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。

# 使用 EMRFS S3 優化遞交者
<a name="emr-spark-s3-optimized-committer"></a>

EMRFS S3 優化遞交者是 [OutputCommitter](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html) 實作的替代方案，針對使用 EMRFS 時寫入檔案至 Amazon S3 的操作優化。EMRFS S3 優化遞交者透過避免在作業和任務遞交階段於 Amazon S3 執行清單和重新命名操作，藉此提升應用程式效能。遞交者在 Amazon EMR 5.19.0 版和更高版本中提供，並在 Amazon EMR 5.20.0 和更高版本預設為啟用。遞交者用於使用 Spark、DataFrames或資料集的 Spark 任務。從 Amazon EMR 6.4.0 開始，此遞交者可用於所有常見格式，包括 parquet、ORC 和文字類型格式 (例如 CSV 和 JSON)。對於 Amazon EMR 6.4.0 之前的版本，僅支援 Parquet 格式。在某些情況下將不會使用遞交者。如需詳細資訊，請參閱[EMRFS S3 優化遞交者要求](emr-spark-committer-reqs.md)。

**Topics**
+ [EMRFS S3 優化遞交者要求](emr-spark-committer-reqs.md)
+ [EMRFS S3 優化遞交者和分段上傳](emr-spark-committer-multipart.md)
+ [作業調校考量](emr-spark-committer-tuning.md)
+ [為 Amazon EMR 5.19.0 啟用 EMRFS S3 優化遞交者](emr-spark-committer-enable.md)

# EMRFS S3 優化遞交者要求
<a name="emr-spark-committer-reqs"></a>

符合下列條件時會使用 EMRFS S3 最交化遞交者：
+ 您可以執行使用 Spark、DataFrames或資料集將檔案寫入 Amazon S3 的 Spark 任務。從 Amazon EMR 6.4.0 開始，此遞交者可用於所有常見格式，包括 parquet、ORC 和文字類型格式 (例如 CSV 和 JSON)。對於 Amazon EMR 6.4.0 之前的版本，僅支援 Parquet 格式。
+ 分段上傳會於 Amazon EMR 中啟用。這是預設值。如需詳細資訊，請參閱[EMRFS S3 優化遞交者和分段上傳](emr-spark-committer-multipart.md)。
+ 使用 Spark 的內置檔案格式支援。在以下情況會使用內建的檔案格式支援：
  + 對於 Hive 中繼存放區資料表，當使用 Amazon EMR 6.4.0 或更高版本將 Parquet 資料表的 `spark.sql.hive.convertMetastoreParquet` 設定為 `true`，或將 Orc 資料表的 `spark.sql.hive.convertMetastoreOrc` 設定為 `true` 時。這些是預設設定。
  + 當作業寫入至檔案格式資料來源或資料表時，例如使用 `USING parquet` 子句建立目標資料表時。
  + 當任務寫入至未分割 Hive 中繼存放區 Parquet 資料表時。已知限制是 Spark 的內建 Parquet 支援並不支援已分割的 Hive 資料表。如需詳細資訊，請參閱《Apache Spark、DataFrames 和資料集指南》中的 [Hive 中繼存放區 Parquet 資料表轉換](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion)。
+ 寫入至預設分割區位置 Spark 作業操作，例如 `${table_location}/k1=v1/k2=v2/`，使用遞交者。如果作業操作寫入至自訂分割區位置，則不使用遞交者，例如使用 `ALTER TABLE SQL` 命令設定自訂分割區位置。
+ 必須使用下列用於 Spark 的值：
  + `spark.sql.parquet.fs.optimized.committer.optimization-enabled` 屬性必須設為 `true`。這是 Amazon EMR 5.20.0 和更高版本的預設設定。若使用 Amazon EMR 5.19.0，預設值為 `false`。如需如何設定此值的詳細資訊，請參閱 [為 Amazon EMR 5.19.0 啟用 EMRFS S3 優化遞交者](emr-spark-committer-enable.md)。
  + 如果寫入至非分割的 Hive 中繼存放區資料表，則僅支援 Parquet 和 Orc 檔案格式。`true`如果寫入至非分割的 Parquet Hive 中繼存放區資料表，`spark.sql.hive.convertMetastoreParquet`則必須將 設定為 。`true`如果寫入至非分割的 Orc Hive 中繼存放區資料表，`spark.sql.hive.convertMetastoreOrc`則必須將 設定為 。這些是預設設定。
  + `spark.sql.parquet.output.committer.class` 必須設定為 `com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter`。這是預設設定。
  + `spark.sql.sources.commitProtocolClass` 必須設定為 `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` 或 `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`。`org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` 是 Amazon EMR 5.x 系列版本 5.30.0 及更高版本和 Amazon EMR 6.x 系列版本 6.2.0 及更高版本的預設設定。先前 Amazon EMR 版本的預設設定為 `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`。
  + 如果 Spark 任務使用動態分割區欄覆寫分割的 Parquet 資料集，則 `partitionOverwriteMode` 寫入選項和 `spark.sql.sources.partitionOverwriteMode` 必須設為 `static`。這是預設設定。
**注意**  
`partitionOverwriteMode` 寫入選項已導入至 Spark 2.4.0。針對包含於 Amazon EMR 5.19.0 版的 Spark 版本 2.3.2，請設定 `spark.sql.sources.partitionOverwriteMode` 屬性。

## 不使用 EMRFS S3 優化遞交者時的場合
<a name="emr-spark-committer-reqs-anti"></a>

一般而言，EMRFS S3 優化遞交者不用於下列情形。


****  

| 情形 | 為什麼不使用遞交者 | 
| --- | --- | 
| 當您寫入到 HDFS 時 | 遞交者僅支援使用 EMRFS 寫入至 Amazon S3。 | 
| 當您使用 S3A 檔案系統時 | 遞交者僅支援 EMRFS。 | 
| 當您使用 MapReduce 或 Spark 的 RDD API 時 | 遞交者僅支援使用 SparkSQL、DataFrame 或資料集 API。 | 

下列 Scala 範例示範了某些其他情形會阻止使用整個 (第一個範例) 和部分 (第二個範例) EMRFS S3 優化遞交者。

**Example – 動態分割區覆寫模式**  
以下 Scala 範例說明 Spark 使用不同的遞交演算法，從而完全阻止使用 EMRFS S3 優化遞交者。程式碼將 `partitionOverwriteMode` 屬性設為 `dynamic`，僅覆寫您要寫入資料的分割區。然後，由 `partitionBy` 指定動態分割區資料欄，且寫入模式被設為 `overwrite`。  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .partitionBy("dt")
  .parquet("s3://amzn-s3-demo-bucket1/output")
```
您必須執行全部三項設定，以避免使用 EMRFS S3 優化遞交者。當您這樣做時，Spark 會執行在 Spark 遞交通訊協定中指定的其他遞交演算法。對於早於 5.30.0 的 Amazon EMR 5.x 版本，以及早於 6.2.0 的 Amazon EMR 6.x 版本，遞交通訊協定使用 Spark 的暫存目錄，它是以 `.spark-staging` 開頭，在輸出位置下建立的暫時目錄。該演算法會按順序重新命名分割區目錄，這可能對效能產生負面影響。如需有關 Amazon EMR 5.30.0 版及更高版本，和 6.2.0 版及更高版本的詳細資訊，請參閱 [使用 EMRFS S3 優化遞交通訊協定](emr-spark-s3-optimized-commit-protocol.md)。  
Spark 2.4.0 中的演算法遵循以下步驟：  

1. 任務會試圖將輸出寫入 Spark 暫存目錄下的分割區目錄，例如 `${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/`。

1. 針對每個寫入的分割區，該任務會試圖保持相對分割區路徑的追蹤，例如 `k1=v1/k2=v2`。

1. 在任務成功完成後，它會將所有追蹤的相對分割區路徑提供給驅動程式。

1. 完成所有任務後，該任務遞交階段將收集在 Spark 臨時目錄下，所有成功任務嘗試寫入的分割區目錄。Spark 使用目錄樹狀圖重新命名操作，按順序將每個目錄重新命名為其最終輸出位置。

1. 在任務遞交階段完成之前刪除臨時目錄。

**Example – 自訂分割區位置**  
在此範例中，該 Scala 程式碼將插入至兩個分割區。其中一個分割區有自訂的分割區位置。另一個分割區使用預設分割區位置。EMRFS S3 最佳化遞交者僅用於寫入任務輸出至使用預設分割區位置的分割區。  

```
val table = "dataset"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")
                            
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")
                            
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
                            
def asDate(text: String) = lit(text).cast("date")
                            
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .write.insertInto(table)
```
Scala 程式碼會建立以下 Amazon S3 物件：  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
在寫入至自訂位置的分割區時，Spark 會使用一個和上一個範例相似的遞交演算法，如下所述。如之前的範例所示，該演算法會導致順序重新命名，這可能會對效能產生負面影響。  

1. 在將輸出寫入自訂位置的分割區時，任務會寫入至 Spark 臨時目錄下的檔案 (該目錄建立在最終輸出位置下)。該檔案名稱會包含隨機的 UUID，以防止檔案衝突。該任務會嘗試追蹤每個檔案以及最終所要的輸出路徑。

1. 在任務成功完成後，它會將檔案和其最終所要之輸出路徑提供給驅動程式。

1. 完成所有任務後，任務遞交階段會依序將所有為分割區寫入至自訂位置的檔案，重新命名為其最終輸出路徑。

1. 在任務遞交階段完成之前刪除臨時目錄。

# EMRFS S3 優化遞交者和分段上傳
<a name="emr-spark-committer-multipart"></a>

若要使用 EMRFS S3 優化遞交者，必須為 Amazon EMR 啟用分段上傳。分段上傳預設為啟用。您可以視需要重新啟用。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[設定適用於 Amazon S3 的分段上傳](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart)。

EMRFS S3 最佳化遞交器利用分段上傳類似交易的特性，確保由任務嘗試寫入的檔案只會在任務遞交時顯示於工作的輸出位置。透過此方式使用分段上傳，遞交器可提升預設 FileOutputCommitter 演算法版本 2 的任務遞交效能。使用 EMRFS S3 最佳化遞交器時，應考量傳統分段上傳行為的某些主要差異：
+ 無論檔案大小，分段上傳會一律執行。這不同於 EMRFS 的預設行為 (`fs.s3n.multipart.uploads.split.size` 屬性會控制分段上傳觸發時的檔案大小)。
+ 分段上傳會在不完整狀態維持較長的期間，直到任務遞交或中止。這不同於 EMRFS 的預設行為 (分段上傳會在任務完成寫入特定檔案時完成)。

由於這些差異，如果 Spark Executor JVM 在任務執行和寫入資料至 Amazon S3 時當機或終止，則更可能會留下不完整的分段上傳。因此，在您使用 EMRFS S3 最佳化遞交器時，請務必依照最佳實務來管理失敗的分段上傳。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中有關使用 Amazon S3 儲存貯體的[最佳實務](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices)。

# 作業調校考量
<a name="emr-spark-committer-tuning"></a>

針對由任務嘗試寫入的每個檔案，EMRFS S3 最佳化遞交器會耗用少量記憶體，直到任務遞交或中止。在大多數任務中，記憶體的消耗量極少。對於具有寫入大量檔案之長期執行任務的工作，遞交器耗用的記憶體量可能會很明顯，並需要調整配置給 Spark 執行器的記憶體。您可以使用 `spark.executor.memory` 屬性調校執行器記憶體。根據準則，寫入 100,000 個檔案的單一任務通常需要額外 100 MB 的記憶體。如需詳細資訊，請參閱 Apache Spark 組態文件中的[應用程式屬性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。

# 為 Amazon EMR 5.19.0 啟用 EMRFS S3 優化遞交者
<a name="emr-spark-committer-enable"></a>

如果您使用的是 Amazon EMR 5.19.0，您可以在使用 Amazon EMR 建立叢集時或從 Spark 中手動將此 `spark.sql.parquet.fs.optimized.committer.optimization-enabled` 屬性設為 `true`。

## 在建立叢集時啟用 EMRFS S3 優化遞交者
<a name="w2aac62c61c17c13b5"></a>

使用 `spark-defaults` 組態分類將 `spark.sql.parquet.fs.optimized.committer.optimization-enabled` 屬性設為 `true`。如需詳細資訊，請參閱[設定應用程式](emr-configure-apps.md)。

## 從 Spark 啟用 EMRFS S3 優化遞交者
<a name="w2aac62c61c17c13b7"></a>

您可以將 `spark.sql.parquet.fs.optimized.committer.optimization-enabled` 設為 `true`，方式是於 `SparkConf` 將其硬編碼，在 Spark shell 或 `spark-submit` 和 `spark-sql` 工具中或在 `conf/spark-defaults.conf` 將其做為 `--conf` 參數傳遞。如需詳細資訊，請參閱 Apache Spark 文件中的 [Spark 組態](https://spark.apache.org/docs/latest/configuration.html)。

以下範例示範如何在執行 spark-sql 命令時啟用遞交器。

```
spark-sql \
  --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true \
  -e "INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;"
```

# 使用 EMRFS S3 優化遞交通訊協定
<a name="emr-spark-s3-optimized-commit-protocol"></a>

EMRFS S3 優化遞交通訊協定是替代性 [FileCommitProtocol](https://spark.apache.org/docs/2.2.0//api/java/org/apache/spark/internal/io/FileCommitProtocol.html) 實作，針對使用 EMRFS 將 Spark 動態分割區覆寫檔案寫入至 Amazon S3 的操作進行過優化。該通訊協定透過在 Spark 動態分割區覆寫作業遞交階段避免 Amazon S3 中的重新命名操作，從而提升應用程式的效能。

請注意，[EMRFS S3-optimized遞交者](emr-spark-s3-optimized-committer.html)也透過避免重新命名操作來改善效能。不過，它不適用於動態分割區覆寫的案例，而遞交通訊協定的改進僅針對動態分割區覆寫案例。

遞交通訊協定在 Amazon EMR 5.30.0 版和更高版本以及 6.2.0 版和更高版本中提供，並預設為啟用。Amazon EMR 從 5.31.0 版開始新增了平行處理改進。此通訊協定用於使用 Spark、DataFrames或資料集的 Spark 任務。在某些情況下將不會使用遞交通訊協定。如需詳細資訊，請參閱[EMRFS S3 優化遞交通訊協定要求](emr-spark-committer-reqs.md)。

**Topics**
+ [EMRFS S3 優化遞交通訊協定要求](emr-spark-commit-protocol-reqs.md)
+ [EMRFS S3 優化遞交通訊協定和分段上傳](emr-spark-commit-protocol-multipart.md)
+ [作業調校考量](emr-spark-commit-protocol-tuning.md)

# EMRFS S3 優化遞交通訊協定要求
<a name="emr-spark-commit-protocol-reqs"></a>

符合下列條件時會使用 EMRFS S3 優化遞交通訊協定：
+ 您可以執行使用 Spark、DataFrames或資料集覆寫分割資料表的 Spark 任務。
+ 您將執行分割區覆寫模式為 `dynamic` 的 Spark 作業。
+ 分段上傳會於 Amazon EMR 中啟用。這是預設值。如需詳細資訊，請參閱[EMRFS S3 優化遞交通訊協定和分段上傳](emr-spark-commit-protocol-multipart.md)。
+ 適用於 EMRFS 的檔案系統快取已啟用。這是預設值。檢查設定 `fs.s3.impl.disable.cache` 是否設為 `false`。
+ 使用 Spark 的內建資料來源支援。在以下情況會使用內建資料來源支援：
  + 當作業寫入至內建的資料來源或資料表時。
  + 當作業寫入至 Hive 中繼存放區 Parquet 資料表時。當 `spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastoreParquet` 都設為 true 時會發生這種情況。這些是預設設定。
  + 當作業寫入至 Hive 中繼存放區 ORC 資料表時。當 `spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastoreOrc` 都設為 `true` 時會發生這種情況。這些是預設設定。
+ 當 Spark 作業操作寫入至預設的分割區位置時，例如 `${table_location}/k1=v1/k2=v2/`，會使用遞交通訊協定。如果作業操作寫入至自訂分割區位置，則不使用該通訊協定，例如使用 `ALTER TABLE SQL` 命令設定自訂分割區位置。
+ 必須使用下列用於 Spark 的值：
  + `spark.sql.sources.commitProtocolClass` 必須設定為 `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`。這是 Amazon EMR 5.30.0 版及更高版本、6.2.0 及更高版本的預設設定。
  + `partitionOverwriteMode` 寫入選項或 `spark.sql.sources.partitionOverwriteMode` 必須設為 `dynamic`。預設設定為 `static`。
**注意**  
`partitionOverwriteMode` 寫入選項已導入至 Spark 2.4.0。針對包含於 Amazon EMR 5.19.0 版的 Spark 版本 2.3.2，請設定 `spark.sql.sources.partitionOverwriteMode` 屬性。
  + 如果 Spark 作業寫入至 Hive 中繼存放區 Parquet 資料表，則 `spark.sql.hive.convertMetastoreParquet`、`spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastore.partitionOverwriteMode` 必須設為 `true`。系統有預設的設定。
  + 如果 Spark 作業寫入至 Hive 中繼存放區 ORC 資料表，則 `spark.sql.hive.convertMetastoreOrc`、`spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastore.partitionOverwriteMode` 必須設為 `true`。系統有預設的設定。

**Example – 動態分割區覆寫模式**  
在這個 Scala 範例中，將觸發優化。首先，將 `partitionOverwriteMode` 屬性設定為 `dynamic`。這僅會覆寫您正寫入資料的分割區。然後，您要使用 `partitionBy` 指定動態分割區資料欄，並將寫入模式設為 `overwrite`。  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## 當不使用 EMRFS S3 優化遞交通訊協定時
<a name="emr-spark-commit-protocol-reqs-anti"></a>

一般而言，EMRFS S3-optimized遞交通訊協定的運作方式與開放原始碼預設 Spark 遞交通訊協定 相同`org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`。在下列情形中不會發生優化。


****  

| 情形 | 為什麼不使用遞交通訊協定 | 
| --- | --- | 
| 當您寫入到 HDFS 時 | 遞交通訊協定僅支援使用 EMRFS 寫入至 Amazon S3。 | 
| 當您使用 S3A 檔案系統時 | 遞交通訊協定僅支援 EMRFS。 | 
| 當您使用 MapReduce 或 Spark 的 RDD API 時 | 遞交通訊協定僅支援使用 SparkSQL、DataFrame 或 Dataset API。 | 
| 當未觸發動態分割區覆寫時 | 遞交通訊協定僅對動態分割區覆寫案例進行優化。如需了解其他案例，請參閱 [使用 EMRFS S3 優化遞交者](emr-spark-s3-optimized-committer.md)。 | 

下列 Scala 範例示範了 EMRFS S3 優化遞交通訊協定委派給 `SQLHadoopMapReduceCommitProtocol` 的其他一些情形。

**Example - 具有自訂分割區位置的動態分割區覆寫模式**  
在此範例中，Scala 程式會以動態分割區覆寫模式覆寫兩個分割區。其中一個分割區有自訂的分割區位置。另一個分割區使用預設分割區位置。EMRFS S3 優化遞交通訊協定僅改進使用預設分割區位置的分割區。  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Scala 程式碼會建立以下 Amazon S3 物件：  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
早期 Spark 版本中寫入到自訂分割區位置可能導致資料遺失。在此範例中，分割區 `dt='2019-01-28'` 會遺失。如需詳細資料，請參閱 [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106)。此問題已在 Amazon EMR 5.33.0 版及更高版本中修正，6.0.x 和 6.1.x 除外。

在寫入至自訂位置的分割區時，Spark 會使用一個和上一個範例相似的遞交演算法，如下所述。如之前的範例所示，該演算法會導致順序重新命名，這可能會對效能產生負面影響。

Spark 2.4.0 中的演算法遵循以下步驟：

1. 在將輸出寫入自訂位置的分割區時，任務會寫入至 Spark 臨時目錄下的檔案 (該目錄建立在最終輸出位置下)。該檔案名稱會包含隨機的 UUID，以防止檔案衝突。該任務會嘗試追蹤每個檔案以及最終所要的輸出路徑。

1. 在任務成功完成後，它會將檔案和其最終所要之輸出路徑提供給驅動程式。

1. 完成所有任務後，任務遞交階段會依序將所有為分割區寫入至自訂位置的檔案，重新命名為其最終輸出路徑。

1. 在任務遞交階段完成之前刪除臨時目錄。

# EMRFS S3 優化遞交通訊協定和分段上傳
<a name="emr-spark-commit-protocol-multipart"></a>

若要利用 EMRFS S3 優化遞交通訊協定中的動態分割區覆寫優化，則必須在 Amazon EMR 中啟用多段上傳。分段上傳預設為啟用。您可以視需要重新啟用。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[設定適用於 Amazon S3 的分段上傳](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart)。

動態分割區覆寫期間，EMRFS S3 優化遞交通訊協定利用分段上傳類似交易的特性，確保由任務嘗試寫入的檔案只會在作業遞交時顯示於作業的輸出位置。透過以此方式使用多段上傳，遞交通訊協定將提升作業遞交效能，讓其比預設 `SQLHadoopMapReduceCommitProtocol` 更佳。使用 EMRFS S3 優化遞交通訊協定時，應考量傳統分段上傳行為的某些主要差異：
+ 無論檔案大小，分段上傳會一律執行。這不同於 EMRFS 的預設行為 (`fs.s3n.multipart.uploads.split.size` 屬性會控制分段上傳觸發時的檔案大小)。
+ 分段上傳會在不完整狀態維持較長的期間，直到任務遞交或中止。這不同於 EMRFS 的預設行為 (分段上傳會在任務完成寫入特定檔案時完成)。

由於這些差異，如果 Spark Executor JVM 在任務執行和寫入資料至 Amazon S3 時當機或終止，或者 Spark 驅動程式 JVM 在作業執行時當機或終止，則更可能會留下不完整的分段上傳。因此，在您使用 EMRFS S3 優化遞交通訊協定時，請務必依照最佳實務來管理失敗的分段上傳。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中有關使用 Amazon S3 儲存貯體的[最佳實務](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices)。

# 作業調校考量
<a name="emr-spark-commit-protocol-tuning"></a>

在執行器上，針對由任務嘗試寫入的每個檔案，EMRFS S3 優化遞交通訊協定會耗用少量記憶體，直到任務遞交或中止。在大多數任務中，記憶體的消耗量極少。

在 Spark 驅動程式上，EMRFS S3 優化遞交通訊協定需要記憶體來儲存每個遞交檔案的中繼資料資訊，直至作業被遞交或中止為止。在大多數作業中，會忽略預設 Spark 驅動程式記憶體設定。

對於具有寫入大量檔案之長時間執行任務的作業，遞交通訊協定耗用的記憶體量可能會很明顯，並需要調整配置給 Spark，特別是 Spark 執行器的記憶體。您可以使用 `spark.driver.memory` 屬性調整 Spark 驅動程式的記憶體，或使用 `spark.executor.memory` 屬性調整 Spark 執行器的記憶體。根據準則，寫入 100,000 個檔案的單一任務通常需要額外 100 MB 的記憶體。如需詳細資訊，請參閱 Apache Spark 組態文件中的[應用程式屬性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。

# 使用 EMRFS 的重試 Amazon S3 請求
<a name="emr-spark-emrfs-retry"></a>

本主題提供使用 EMRFS 向 Amazon S3 發出請求時，您可以使用的重試策略相關資訊。當您的請求速率加快時，S3 嘗試擴展以支援新的速率。在此過程中，S3 可能對請求限流並傳回 `503 Slow Down` 錯誤。若要提高您的 S3 請求的成功率，您可以透過在 `emrfs-site` 組態中設定屬性，調整您的重試策略。

您可以採用下列方式調整您的重試策略。
+ 提高預設指數退避重試策略的重試上限。
+ 啟用並設定和性增長/乘性降低 (AIMD) 重試策略。Amazon EMR 6.4.0 版和更高版本支援 AIMD。

## 使用預設指數退避策略
<a name="emr-spark-emrfs-retry-exponential-backoff"></a>

依預設，EMRFS 會使用指數退避策略重試 Amazon S3 請求。預設 EMRFS 重試限制為 15。若要避免 S3 `503 Slow Down` 錯誤，您可以在建立新叢集時、執行中叢集上或應用程式執行期調高重試限制。

如果想要調高重試限制，您必須在 `emrfs-site` 組態中變更 `fs.s3.maxRetries` 的值。下列範例組態將 `fs.s3.maxRetries` 設為自訂值 30。

```
[
    {
      "Classification": "emrfs-site",
      "Properties": {
        "fs.s3.maxRetries": "30"
      }
    }
]
```

如需使用組態物件的詳細資訊，請參閱 [設定應用程式](emr-configure-apps.md)。

## 使用 AIMD 重試策略
<a name="emr-spark-emrfs-retry-aimd"></a>

在 Amazon EMR 6.4.0 版及更高版本中，EMRFS 支援依據和性增長/乘性降低 (AIMD) 模型的替代性重試策略。當您使用大型 Amazon EMR 叢集時，AIMD 重試策略變得特別有用。

AIMD 會使用有關近期成功請求的資料計算自訂請求速率。此策略會減少限流請求的數量和每個請求所需的嘗試總次數。

若要啟用 AIMD 重試策略，您必須將 `emrfs-site` 組態中的 `fs.s3.aimd.enabled` 屬性設為 `true`，如下例所示。

```
[
    {
      "Classification": "emrfs-site",
      "Properties": {
        "fs.s3.aimd.enabled": "true"
      }
    }
]
```

如需使用組態物件的詳細資訊，請參閱 [設定應用程式](emr-configure-apps.md)。

## 進階 AIMD 重試設定
<a name="emr-spark-emrfs-retry-advanced-properties"></a>

在使用 AIMD 重試策略時，您可以設定下表中所列的屬性，以完善重試行為。對於大多數使用案例，建議您使用預設值。


**進階 AIMD 重試策略屬性**  

| 屬性 | 預設值 | Description | 
| --- | --- | --- | 
| fs.s3.aimd.increaseIncrement | 0.1 | 控制當連續請求成功時，請求速率以多快速度加快。 | 
| fs.s3.aimd.reductionFactor | 2 | 控制當 Amazon S3 傳回 503 回應時，請求速率以多快速度減緩。預設係數 2 會使請求速率減半。 | 
| fs.s3.aimd.minRate | 0.1 | 當 S3 請求遇到持續限流時，設定請求速率下限。 | 
| fs.s3.aimd.initialRate | 5500 | 設定初始請求速率，然後它會依據您為 fs.s3.aimd.increaseIncrement 和 fs.s3.aimd.reductionFactor 指定的值變更。初始速率還用於 GET 請求，並且會針對 PUT 請求按比例擴展 (3500/5500)。 | 
| fs.s3.aimd.adjustWindow | 2 | 控制調整請求速率的頻率，使用回應數量進行衡量。 | 
| fs.s3.aimd.maxAttempts | 100 | 設定嘗試請求的最大次數。 | 