

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# EMRFS S3 優化遞交者要求
<a name="emr-spark-committer-reqs"></a>

符合下列條件時會使用 EMRFS S3 最交化遞交者：
+ 您可以執行使用 Spark、DataFrames或資料集將檔案寫入 Amazon S3 的 Spark 任務。從 Amazon EMR 6.4.0 開始，此遞交者可用於所有常見格式，包括 parquet、ORC 和文字類型格式 (例如 CSV 和 JSON)。對於 Amazon EMR 6.4.0 之前的版本，僅支援 Parquet 格式。
+ 分段上傳會於 Amazon EMR 中啟用。這是預設值。如需詳細資訊，請參閱[EMRFS S3 優化遞交者和分段上傳](emr-spark-committer-multipart.md)。
+ 使用 Spark 的內置檔案格式支援。在以下情況會使用內建的檔案格式支援：
  + 對於 Hive 中繼存放區資料表，當使用 Amazon EMR 6.4.0 或更高版本將 Parquet 資料表的 `spark.sql.hive.convertMetastoreParquet` 設定為 `true`，或將 Orc 資料表的 `spark.sql.hive.convertMetastoreOrc` 設定為 `true` 時。這些是預設設定。
  + 當作業寫入至檔案格式資料來源或資料表時，例如使用 `USING parquet` 子句建立目標資料表時。
  + 當任務寫入至未分割 Hive 中繼存放區 Parquet 資料表時。已知限制是 Spark 的內建 Parquet 支援並不支援已分割的 Hive 資料表。如需詳細資訊，請參閱《Apache Spark、DataFrames 和資料集指南》中的 [Hive 中繼存放區 Parquet 資料表轉換](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion)。
+ 寫入至預設分割區位置 Spark 作業操作，例如 `${table_location}/k1=v1/k2=v2/`，使用遞交者。如果作業操作寫入至自訂分割區位置，則不使用遞交者，例如使用 `ALTER TABLE SQL` 命令設定自訂分割區位置。
+ 必須使用下列用於 Spark 的值：
  + `spark.sql.parquet.fs.optimized.committer.optimization-enabled` 屬性必須設為 `true`。這是 Amazon EMR 5.20.0 和更高版本的預設設定。若使用 Amazon EMR 5.19.0，預設值為 `false`。如需如何設定此值的詳細資訊，請參閱 [為 Amazon EMR 5.19.0 啟用 EMRFS S3 優化遞交者](emr-spark-committer-enable.md)。
  + 如果寫入至非分割的 Hive 中繼存放區資料表，則僅支援 Parquet 和 Orc 檔案格式。`true`如果寫入至非分割的 Parquet Hive 中繼存放區資料表，`spark.sql.hive.convertMetastoreParquet`則必須將 設定為 。`true`如果寫入至非分割的 Orc Hive 中繼存放區資料表，`spark.sql.hive.convertMetastoreOrc`則必須將 設定為 。這些是預設設定。
  + `spark.sql.parquet.output.committer.class` 必須設定為 `com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter`。這是預設設定。
  + `spark.sql.sources.commitProtocolClass` 必須設定為 `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` 或 `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`。`org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` 是 Amazon EMR 5.x 系列版本 5.30.0 及更高版本和 Amazon EMR 6.x 系列版本 6.2.0 及更高版本的預設設定。先前 Amazon EMR 版本的預設設定為 `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`。
  + 如果 Spark 任務使用動態分割區欄覆寫分割的 Parquet 資料集，則 `partitionOverwriteMode` 寫入選項和 `spark.sql.sources.partitionOverwriteMode` 必須設為 `static`。這是預設設定。
**注意**  
`partitionOverwriteMode` 寫入選項已導入至 Spark 2.4.0。針對包含於 Amazon EMR 5.19.0 版的 Spark 版本 2.3.2，請設定 `spark.sql.sources.partitionOverwriteMode` 屬性。

## 不使用 EMRFS S3 優化遞交者時的場合
<a name="emr-spark-committer-reqs-anti"></a>

一般而言，EMRFS S3 優化遞交者不用於下列情形。


****  

| 情形 | 為什麼不使用遞交者 | 
| --- | --- | 
| 當您寫入到 HDFS 時 | 遞交者僅支援使用 EMRFS 寫入至 Amazon S3。 | 
| 當您使用 S3A 檔案系統時 | 遞交者僅支援 EMRFS。 | 
| 當您使用 MapReduce 或 Spark 的 RDD API 時 | 遞交者僅支援使用 SparkSQL、DataFrame 或資料集 API。 | 

下列 Scala 範例示範了某些其他情形會阻止使用整個 (第一個範例) 和部分 (第二個範例) EMRFS S3 優化遞交者。

**Example – 動態分割區覆寫模式**  
以下 Scala 範例說明 Spark 使用不同的遞交演算法，從而完全阻止使用 EMRFS S3 優化遞交者。程式碼將 `partitionOverwriteMode` 屬性設為 `dynamic`，僅覆寫您要寫入資料的分割區。然後，由 `partitionBy` 指定動態分割區資料欄，且寫入模式被設為 `overwrite`。  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .partitionBy("dt")
  .parquet("s3://amzn-s3-demo-bucket1/output")
```
您必須執行全部三項設定，以避免使用 EMRFS S3 優化遞交者。當您這樣做時，Spark 會執行在 Spark 遞交通訊協定中指定的其他遞交演算法。對於早於 5.30.0 的 Amazon EMR 5.x 版本，以及早於 6.2.0 的 Amazon EMR 6.x 版本，遞交通訊協定使用 Spark 的暫存目錄，它是以 `.spark-staging` 開頭，在輸出位置下建立的暫時目錄。該演算法會按順序重新命名分割區目錄，這可能對效能產生負面影響。如需有關 Amazon EMR 5.30.0 版及更高版本，和 6.2.0 版及更高版本的詳細資訊，請參閱 [使用 EMRFS S3 優化遞交通訊協定](emr-spark-s3-optimized-commit-protocol.md)。  
Spark 2.4.0 中的演算法遵循以下步驟：  

1. 任務會試圖將輸出寫入 Spark 暫存目錄下的分割區目錄，例如 `${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/`。

1. 針對每個寫入的分割區，該任務會試圖保持相對分割區路徑的追蹤，例如 `k1=v1/k2=v2`。

1. 在任務成功完成後，它會將所有追蹤的相對分割區路徑提供給驅動程式。

1. 完成所有任務後，該任務遞交階段將收集在 Spark 臨時目錄下，所有成功任務嘗試寫入的分割區目錄。Spark 使用目錄樹狀圖重新命名操作，按順序將每個目錄重新命名為其最終輸出位置。

1. 在任務遞交階段完成之前刪除臨時目錄。

**Example – 自訂分割區位置**  
在此範例中，該 Scala 程式碼將插入至兩個分割區。其中一個分割區有自訂的分割區位置。另一個分割區使用預設分割區位置。EMRFS S3 最佳化遞交者僅用於寫入任務輸出至使用預設分割區位置的分割區。  

```
val table = "dataset"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")
                            
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")
                            
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
                            
def asDate(text: String) = lit(text).cast("date")
                            
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .write.insertInto(table)
```
Scala 程式碼會建立以下 Amazon S3 物件：  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
在寫入至自訂位置的分割區時，Spark 會使用一個和上一個範例相似的遞交演算法，如下所述。如之前的範例所示，該演算法會導致順序重新命名，這可能會對效能產生負面影響。  

1. 在將輸出寫入自訂位置的分割區時，任務會寫入至 Spark 臨時目錄下的檔案 (該目錄建立在最終輸出位置下)。該檔案名稱會包含隨機的 UUID，以防止檔案衝突。該任務會嘗試追蹤每個檔案以及最終所要的輸出路徑。

1. 在任務成功完成後，它會將檔案和其最終所要之輸出路徑提供給驅動程式。

1. 完成所有任務後，任務遞交階段會依序將所有為分割區寫入至自訂位置的檔案，重新命名為其最終輸出路徑。

1. 在任務遞交階段完成之前刪除臨時目錄。