

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# EMRFS S3 최적화 커밋 프로토콜의 요구 사항
<a name="emr-spark-commit-protocol-reqs"></a>

EMRFS S3 최적화 커밋 프로토콜은 다음 조건을 충족하는 경우에 사용됩니다.
+ Spark, DataFrame 또는 데이터 세트를 사용하여 파티셔닝된 테이블을 덮어쓰는 Spark 작업을 실행할 경우.
+ 파티션 덮어쓰기 모드가 `dynamic`인 Spark 작업을 실행합니다.
+ Amazon EMR에서 멀티파트 업로드는 활성화됩니다. 기본값입니다. 자세한 내용은 [EMRFS S3 최적화 커밋 프로토콜 및 멀티파트 업로드](emr-spark-commit-protocol-multipart.md) 단원을 참조하십시오.
+ EMRFS의 파일 시스템 캐시가 활성화되었습니다. 기본값입니다. `fs.s3.impl.disable.cache` 설정이 `false`로 설정되어 있는지 확인합니다.
+ Spark의 기본 제공 데이터 소스 지원이 사용됩니다. 기본 제공 데이터 소스 지원은 다음 상황에서 사용됩니다.
  + 작업에서 기본 제공 데이터 소스 또는 테이블에 쓰는 경우.
  + 작업에서 Hive 메타스토어 Parquet 테이블에 쓰는 경우. `spark.sql.hive.convertInsertingPartitionedTable` 및 `spark.sql.hive.convertMetastoreParquet` 모두 true로 설정된 경우에 수행됩니다. 기본 설정입니다.
  + 작업에서 Hive 메타스토어 ORC 테이블에 쓰는 경우. `spark.sql.hive.convertInsertingPartitionedTable` 및 `spark.sql.hive.convertMetastoreOrc` 모두 `true`로 설정된 경우에 수행됩니다. 기본 설정입니다.
+ 기본 파티션 위치에 쓰는 Spark 작업(예: `${table_location}/k1=v1/k2=v2/`)에서는 커밋 프로토콜을 사용합니다. 작업이 사용자 지정 파티션 위치에 쓰는 경우(예: `ALTER TABLE SQL` 명령을 사용하여 사용자 지정 파티션 위치를 설정하는 경우) 프로토콜은 사용되지 않습니다.
+ Spark에는 다음 값을 사용해야 합니다.
  + `spark.sql.sources.commitProtocolClass`를 `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`로 설정해야 합니다. 이는 Amazon EMR 릴리스 5.30.0 이상 및 6.2.0 이상 릴리스에서 기본 설정입니다.
  + `partitionOverwriteMode` 쓰기 옵션 또는 `spark.sql.sources.partitionOverwriteMode`를 `dynamic`으로 설정해야 합니다. 기본 설정은 `static`입니다.
**참고**  
`partitionOverwriteMode` 쓰기 옵션은 Spark 2.4.0에서 소개되었습니다. Amazon EMR 릴리스 5.19.0에 포함된 Spark 버전 2.3.2의 경우 `spark.sql.sources.partitionOverwriteMode` 속성을 설정합니다.
  + Spark 작업이 Hive 메타스토어 Parquet 테이블을 덮어쓰는 경우 `spark.sql.hive.convertMetastoreParquet`, `spark.sql.hive.convertInsertingPartitionedTable` 및 `spark.sql.hive.convertMetastore.partitionOverwriteMode`를 `true`로 설정해야 합니다. 기본 설정이 있습니다.
  + Spark 작업이 Hive 메타스토어 ORC 테이블을 덮어쓰는 경우 `spark.sql.hive.convertMetastoreOrc`, `spark.sql.hive.convertInsertingPartitionedTable` 및 `spark.sql.hive.convertMetastore.partitionOverwriteMode`를 `true`로 설정해야 합니다. 기본 설정이 있습니다.

**Example - 동적 파티션 덮어쓰기 모드**  
이 Scala 예제에서는 최적화가 트리거됩니다. 먼저 `partitionOverwriteMode` 속성을 `dynamic`으로 설정합니다. 이렇게 하면 데이터를 쓰는 대상 파티션만 덮어씁니다. 그런 다음, `partitionBy`를 사용하여 동적 파티션 열을 지정하고 쓰기 모드를 `overwrite`로 설정합니다.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## EMRFS S3 최적화 커밋 프로토콜을 사용하지 않는 경우
<a name="emr-spark-commit-protocol-reqs-anti"></a>

일반적으로 EMRFS S3 최적화 커밋 프로토콜은 오픈 소스 기본 Spark 커밋 프로토콜(`org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`)과 동일하게 작동합니다. 다음과 같은 상황에서는 최적화가 수행되지 않습니다.


****  

| 상황 | 커밋 프로토콜이 사용되지 않는 이유 | 
| --- | --- | 
| HDFS에 기록하는 경우 | 커밋 프로토콜은 EMRFS를 사용하는 Amazon S3에 대한 쓰기만 지원합니다. | 
| S3A 파일 시스템을 사용하는 경우 | 커밋 프로토콜은 EMRFS만 지원합니다. | 
| MapReduce 또는 Spark의 RDD API를 사용하는 경우 | 커밋 프로토콜은 SparkSQL, DataFrame 또는 데이터 세트 API 사용만 지원합니다. | 
| 동적 파티션 덮어쓰기가 트리거되지 않는 경우 | 커밋 프로토콜은 동적 파티션 덮어쓰기 사례만 최적화합니다. 그 밖의 사례는 [EMRFS S3 최적화 커미터 사용](emr-spark-s3-optimized-committer.md) 섹션을 참조하세요. | 

다음 Scala 예제에서는 EMRFS S3 최적화 커밋 프로토콜이 `SQLHadoopMapReduceCommitProtocol`로 위임하는 몇 가지 추가 상황을 보여줍니다.

**Example - 사용자 지정 파티션 위치가 있는 동적 파티션 덮어쓰기 모드**  
이 예제에서 Scala 프로그램은 동적 파티션 덮어쓰기 모드에서 두 개의 파티션을 덮어씁니다. 한 파티션은 사용자 지정 파티션 위치를 갖습니다. 다른 파티션은 기본 파티션 위치를 사용합니다. EMRFS S3 최적화 커미터는 기본 파티션 위치를 사용하는 파티션만 개선합니다.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Scala 코드는 다음 Amazon S3 객체를 생성합니다.  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
이전 Spark 버전에서 사용자 지정 파티션 위치에 데이터를 쓰면 데이터가 손실될 수 있습니다. 이 예제에서는 `dt='2019-01-28'` 파티션이 손실됩니다. 자세한 내용은 [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106)을 참조하세요. 이 문제는 Amazon EMR 릴리스 5.33.0 이상(6.0.x 및 6.1.x 제외)에서 수정되었습니다 .

사용자 지정 위치의 파티션에 쓸 때 Spark는 앞의 예와 비슷한 커밋 알고리즘을 사용합니다. 이에 대해서는 아래에서 간단히 설명합니다. 앞의 예제와 같이 이 알고리즘은 이름을 순차적으로 변경하여 성능에 부정적인 영향을 줄 수 있습니다.

Spark 2.4.0의 알고리즘은 다음 단계를 수행합니다.

1. 사용자 지정 위치의 파티션에 출력을 쓸 경우 작업은 최종 출력 위치에 생성된 Spark의 스테이징 디렉터리에 있는 파일에 씁니다. 파일 이름에는 파일 충돌을 방지하기 위해 무작위 UUID가 포함됩니다. 작업 시도는 각 파일의 추적과, 필요한 최종 출력 경로를 보존합니다.

1. 작업이 성공적으로 완료되면 드라이버에 파일과 해당 출력 경로를 제공합니다.

1. 모든 작업이 완료된 후 작업 커밋 단계에서는 사용자 지정 위치에 파티션에 대해 써진 모든 파일의 이름을 최종 출력 경로로 순차적으로 변경합니다.

1. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.