

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 EMRFS S3 優化遞交通訊協定
<a name="emr-spark-s3-optimized-commit-protocol"></a>

EMRFS S3 優化遞交通訊協定是替代性 [FileCommitProtocol](https://spark.apache.org/docs/2.2.0//api/java/org/apache/spark/internal/io/FileCommitProtocol.html) 實作，針對使用 EMRFS 將 Spark 動態分割區覆寫檔案寫入至 Amazon S3 的操作進行過優化。該通訊協定透過在 Spark 動態分割區覆寫作業遞交階段避免 Amazon S3 中的重新命名操作，從而提升應用程式的效能。

請注意，[EMRFS S3-optimized遞交者](emr-spark-s3-optimized-committer.html)也透過避免重新命名操作來改善效能。不過，它不適用於動態分割區覆寫的案例，而遞交通訊協定的改進僅針對動態分割區覆寫案例。

遞交通訊協定在 Amazon EMR 5.30.0 版和更高版本以及 6.2.0 版和更高版本中提供，並預設為啟用。Amazon EMR 從 5.31.0 版開始新增了平行處理改進。此通訊協定用於使用 Spark、DataFrames或資料集的 Spark 任務。在某些情況下將不會使用遞交通訊協定。如需詳細資訊，請參閱[EMRFS S3 優化遞交通訊協定要求](emr-spark-committer-reqs.md)。

**Topics**
+ [EMRFS S3 優化遞交通訊協定要求](emr-spark-commit-protocol-reqs.md)
+ [EMRFS S3 優化遞交通訊協定和分段上傳](emr-spark-commit-protocol-multipart.md)
+ [作業調校考量](emr-spark-commit-protocol-tuning.md)

# EMRFS S3 優化遞交通訊協定要求
<a name="emr-spark-commit-protocol-reqs"></a>

符合下列條件時會使用 EMRFS S3 優化遞交通訊協定：
+ 您可以執行使用 Spark、DataFrames或資料集覆寫分割資料表的 Spark 任務。
+ 您將執行分割區覆寫模式為 `dynamic` 的 Spark 作業。
+ 分段上傳會於 Amazon EMR 中啟用。這是預設值。如需詳細資訊，請參閱[EMRFS S3 優化遞交通訊協定和分段上傳](emr-spark-commit-protocol-multipart.md)。
+ 適用於 EMRFS 的檔案系統快取已啟用。這是預設值。檢查設定 `fs.s3.impl.disable.cache` 是否設為 `false`。
+ 使用 Spark 的內建資料來源支援。在以下情況會使用內建資料來源支援：
  + 當作業寫入至內建的資料來源或資料表時。
  + 當作業寫入至 Hive 中繼存放區 Parquet 資料表時。當 `spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastoreParquet` 都設為 true 時會發生這種情況。這些是預設設定。
  + 當作業寫入至 Hive 中繼存放區 ORC 資料表時。當 `spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastoreOrc` 都設為 `true` 時會發生這種情況。這些是預設設定。
+ 當 Spark 作業操作寫入至預設的分割區位置時，例如 `${table_location}/k1=v1/k2=v2/`，會使用遞交通訊協定。如果作業操作寫入至自訂分割區位置，則不使用該通訊協定，例如使用 `ALTER TABLE SQL` 命令設定自訂分割區位置。
+ 必須使用下列用於 Spark 的值：
  + `spark.sql.sources.commitProtocolClass` 必須設定為 `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`。這是 Amazon EMR 5.30.0 版及更高版本、6.2.0 及更高版本的預設設定。
  + `partitionOverwriteMode` 寫入選項或 `spark.sql.sources.partitionOverwriteMode` 必須設為 `dynamic`。預設設定為 `static`。
**注意**  
`partitionOverwriteMode` 寫入選項已導入至 Spark 2.4.0。針對包含於 Amazon EMR 5.19.0 版的 Spark 版本 2.3.2，請設定 `spark.sql.sources.partitionOverwriteMode` 屬性。
  + 如果 Spark 作業寫入至 Hive 中繼存放區 Parquet 資料表，則 `spark.sql.hive.convertMetastoreParquet`、`spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastore.partitionOverwriteMode` 必須設為 `true`。系統有預設的設定。
  + 如果 Spark 作業寫入至 Hive 中繼存放區 ORC 資料表，則 `spark.sql.hive.convertMetastoreOrc`、`spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastore.partitionOverwriteMode` 必須設為 `true`。系統有預設的設定。

**Example – 動態分割區覆寫模式**  
在這個 Scala 範例中，將觸發優化。首先，將 `partitionOverwriteMode` 屬性設定為 `dynamic`。這僅會覆寫您正寫入資料的分割區。然後，您要使用 `partitionBy` 指定動態分割區資料欄，並將寫入模式設為 `overwrite`。  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## 當不使用 EMRFS S3 優化遞交通訊協定時
<a name="emr-spark-commit-protocol-reqs-anti"></a>

一般而言，EMRFS S3-optimized遞交通訊協定的運作方式與開放原始碼預設 Spark 遞交通訊協定 相同`org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`。在下列情形中不會發生優化。


****  

| 情形 | 為什麼不使用遞交通訊協定 | 
| --- | --- | 
| 當您寫入到 HDFS 時 | 遞交通訊協定僅支援使用 EMRFS 寫入至 Amazon S3。 | 
| 當您使用 S3A 檔案系統時 | 遞交通訊協定僅支援 EMRFS。 | 
| 當您使用 MapReduce 或 Spark 的 RDD API 時 | 遞交通訊協定僅支援使用 SparkSQL、DataFrame 或 Dataset API。 | 
| 當未觸發動態分割區覆寫時 | 遞交通訊協定僅對動態分割區覆寫案例進行優化。如需了解其他案例，請參閱 [使用 EMRFS S3 優化遞交者](emr-spark-s3-optimized-committer.md)。 | 

下列 Scala 範例示範了 EMRFS S3 優化遞交通訊協定委派給 `SQLHadoopMapReduceCommitProtocol` 的其他一些情形。

**Example - 具有自訂分割區位置的動態分割區覆寫模式**  
在此範例中，Scala 程式會以動態分割區覆寫模式覆寫兩個分割區。其中一個分割區有自訂的分割區位置。另一個分割區使用預設分割區位置。EMRFS S3 優化遞交通訊協定僅改進使用預設分割區位置的分割區。  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Scala 程式碼會建立以下 Amazon S3 物件：  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
早期 Spark 版本中寫入到自訂分割區位置可能導致資料遺失。在此範例中，分割區 `dt='2019-01-28'` 會遺失。如需詳細資料，請參閱 [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106)。此問題已在 Amazon EMR 5.33.0 版及更高版本中修正，6.0.x 和 6.1.x 除外。

在寫入至自訂位置的分割區時，Spark 會使用一個和上一個範例相似的遞交演算法，如下所述。如之前的範例所示，該演算法會導致順序重新命名，這可能會對效能產生負面影響。

Spark 2.4.0 中的演算法遵循以下步驟：

1. 在將輸出寫入自訂位置的分割區時，任務會寫入至 Spark 臨時目錄下的檔案 (該目錄建立在最終輸出位置下)。該檔案名稱會包含隨機的 UUID，以防止檔案衝突。該任務會嘗試追蹤每個檔案以及最終所要的輸出路徑。

1. 在任務成功完成後，它會將檔案和其最終所要之輸出路徑提供給驅動程式。

1. 完成所有任務後，任務遞交階段會依序將所有為分割區寫入至自訂位置的檔案，重新命名為其最終輸出路徑。

1. 在任務遞交階段完成之前刪除臨時目錄。

# EMRFS S3 優化遞交通訊協定和分段上傳
<a name="emr-spark-commit-protocol-multipart"></a>

若要利用 EMRFS S3 優化遞交通訊協定中的動態分割區覆寫優化，則必須在 Amazon EMR 中啟用多段上傳。分段上傳預設為啟用。您可以視需要重新啟用。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[設定適用於 Amazon S3 的分段上傳](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart)。

動態分割區覆寫期間，EMRFS S3 優化遞交通訊協定利用分段上傳類似交易的特性，確保由任務嘗試寫入的檔案只會在作業遞交時顯示於作業的輸出位置。透過以此方式使用多段上傳，遞交通訊協定將提升作業遞交效能，讓其比預設 `SQLHadoopMapReduceCommitProtocol` 更佳。使用 EMRFS S3 優化遞交通訊協定時，應考量傳統分段上傳行為的某些主要差異：
+ 無論檔案大小，分段上傳會一律執行。這不同於 EMRFS 的預設行為 (`fs.s3n.multipart.uploads.split.size` 屬性會控制分段上傳觸發時的檔案大小)。
+ 分段上傳會在不完整狀態維持較長的期間，直到任務遞交或中止。這不同於 EMRFS 的預設行為 (分段上傳會在任務完成寫入特定檔案時完成)。

由於這些差異，如果 Spark Executor JVM 在任務執行和寫入資料至 Amazon S3 時當機或終止，或者 Spark 驅動程式 JVM 在作業執行時當機或終止，則更可能會留下不完整的分段上傳。因此，在您使用 EMRFS S3 優化遞交通訊協定時，請務必依照最佳實務來管理失敗的分段上傳。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中有關使用 Amazon S3 儲存貯體的[最佳實務](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices)。

# 作業調校考量
<a name="emr-spark-commit-protocol-tuning"></a>

在執行器上，針對由任務嘗試寫入的每個檔案，EMRFS S3 優化遞交通訊協定會耗用少量記憶體，直到任務遞交或中止。在大多數任務中，記憶體的消耗量極少。

在 Spark 驅動程式上，EMRFS S3 優化遞交通訊協定需要記憶體來儲存每個遞交檔案的中繼資料資訊，直至作業被遞交或中止為止。在大多數作業中，會忽略預設 Spark 驅動程式記憶體設定。

對於具有寫入大量檔案之長時間執行任務的作業，遞交通訊協定耗用的記憶體量可能會很明顯，並需要調整配置給 Spark，特別是 Spark 執行器的記憶體。您可以使用 `spark.driver.memory` 屬性調整 Spark 驅動程式的記憶體，或使用 `spark.executor.memory` 屬性調整 Spark 執行器的記憶體。根據準則，寫入 100,000 個檔案的單一任務通常需要額外 100 MB 的記憶體。如需詳細資訊，請參閱 Apache Spark 組態文件中的[應用程式屬性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。