

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 커넥터 및 유틸리티
<a name="emr-connectors"></a>

Amazon EMR은 다른 AWS 서비스에 데이터 소스로 액세스할 수 있는 다양한 커넥터와 유틸리티를 제공합니다. 일반적으로 프로그램 내에서 이러한 서비스의 데이터에 액세스할 수 있습니다. 예를 들면 Hive 쿼리, Pig 스크립트 또는 MapReduce 애플리케이션에서 Kinesis 스트림을 지정하여 이러한 데이터에 대한 작업을 수행할 수 있습니다.

**Topics**
+ [Amazon EMR을 사용하여 DynamoDB에서 테이블 내보내기, 가져오기, 쿼리 및 조인](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [S3DistCp(s3-dist-cp)](UsingEMR_s3distcp.md)
+ [실패한 S3DistCp 작업 후 정리](#s3distcp-cleanup)

# Amazon EMR을 사용하여 DynamoDB에서 테이블 내보내기, 가져오기, 쿼리 및 조인
<a name="EMRforDynamoDB"></a>

**참고**  
Amazon EMR-DynamoDB 커넥터는 GitHub에서 오픈 소스로 제공됩니다. 자세한 내용은 [https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector)를 참조하세요.

DynamoDB는 완전관리형 NoSQL 데이터베이스 서비스로, 원활한 확장성과 함께 빠르고 예측 가능한 성능을 제공합니다. 개발자는 데이터베이스 테이블을 만들고 해당 요청 트래픽 또는 스토리지를 제한 없이 확장할 수 있습니다. DynamoDB는 테이블의 데이터와 트래픽을 충분한 수의 서버로 자동 분산하여 고객이 지정한 요청 용량과 저장된 데이터 규모를 처리하면서도 일관되고 빠른 성능을 발휘합니다. 여기에 Amazon EMR 및 Hive를 사용하면 DynamoDB에 저장된 데이터와 같은 대용량 데이터도 빠르고 효율적으로 처리할 수 있습니다. DynamoDB에 대한 자세한 내용은 [Amazon DynamoDB 개발자 안내서](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)를 참조하세요.

Apache Hive는 사용이 쉬우면서 SQL과 유사한 쿼리 언어(HiveQL)로 map reduce 클러스터에 대해 쿼리를 실행할 때 사용하는 소프트웨어 계층입니다. 이 소프트웨어는 Hadoop 아키텍처를 기반으로 실행됩니다. Hive 및 HiveQL에 대한 자세한 내용은 [HiveQL language manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)을 참조하세요. Hive 및 Amazon EMR에 대한 자세한 내용은 [Apache Hive](emr-hive.md) 섹션을 참조하세요.

DynamoDB 연결 기능이 포함된 사용자 지정 버전의 Hive와 함께 Amazon EMR 사용하여 DynamoDB에 저장된 데이터에 대해 다음 작업을 수행할 수 있습니다.
+ DynamoDB 데이터를 Hadoop 분산 파일 시스템(HDFS)에 로드한 후 Amazon EMR 클러스터에 대한 입력으로 사용.
+ 유사 SQL 문(HiveQL)을 사용하여 실시간 DynamoDB 데이터 쿼리.
+ DynamoDB에 저장된 데이터 조인 및 내보내기 또는 조인된 데이터 쿼리.
+ DynamoDB에 저장된 데이터를 Amazon S3로 내보내기.
+ Amazon S3에서 DynamoDB로 데이터 가져오기.

**참고**  
Amazon EMR-DynamoDB 커넥터는 [Kerberos 인증](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html)을 사용하도록 구성된 클러스터를 지원하지 않습니다.

다음 각 작업을 수행하려면 먼저 Amazon EMR 클러스터를 시작하고 DynamoDB에서 데이터 위치를 지정한 다음, Hive 명령을 실행하여 DynamoDB에 저장된 데이터를 조작합니다.

Amazon EMR 클러스터를 시작하는 방법에는 여러 가지가 있습니다. Amazon EMR 콘솔, 명령줄 인터페이스(CLI)를 사용하거나 AWS SDK 또는 Amazon EMR API를 사용하여 클러스터를 프로그래밍할 수 있습니다. 그 밖에 Hive 클러스터를 대화식으로 실행할지, 아니면 스크립트에서 실행할지 선택할 수도 있습니다. 이 섹션에서는 Amazon EMR 콘솔과 CLI를 사용하여 대화형 Hive 클러스터를 시작하는 방법에 대해 살펴보겠습니다.

대화식 Hive 사용은 쿼리 성능을 테스트하고 애플리케이션을 조정하는 데 효과적인 방법입니다. 정기적으로 실행할 Hive 명령 세트를 설정한 후 Amazon EMR에서 자동으로 실행할 수 있는 Hive 스크립트를 생성할 수 있습니다.

**주의**  
DynamoDB 테이블에서 Amazon EMR 읽기 또는 쓰기 작업은 구성된 프로비저닝된 처리량을 소모하여 프로비저닝된 처리량 예외의 발생 주기가 늘어날 가능성이 큽니다. 그래서 Amazon EMR은 대용량 요청을 대비해 지수 백오프 알고리즘을 통한 재시도를 구현함으로써 DynamoDB 테이블에 대한 요청 로드를 관리합니다. Amazon EMR 작업을 다른 트래픽과 동시에 실행하면 할당된 프로비저닝된 처리량 수준을 초과할 수 있습니다. Amazon CloudWatch의 **ThrottleRequests** 지표를 확인하여 이 상태를 모니터링할 수 있습니다. 요청 로드가 너무 높으면 클러스터를 다시 시작한 후 [읽기 비율 설정](EMR_Hive_Optimizing.md#ReadPercent) 또는 [쓰기 비율 설정](EMR_Hive_Optimizing.md#WritePercent)을 더 낮은 값으로 설정하여 Amazon EMR 작업을 조절할 수 있습니다. DynamoDB 처리량 설정에 대한 자세한 내용은 [프로비저닝된 처리량](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput)을 참조하세요.  
테이블이 [온디맨드 모드](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand)로 구성된 경우 내보내기 또는 가져오기 작업을 실행하기 전에 테이블을 프로비저닝된 모드로 다시 변경해야 합니다. DynamoDB 테이블에서 사용할 리소스를 계산하려면 파이프라인에 처리량 비율이 필요합니다. 온디맨드 모드는 프로비저닝된 처리량을 제거합니다. 처리량 용량을 프로비저닝하려는 경우 Amazon CloudWatch Events 지표를 사용하여 테이블에서 사용하는 총 처리량을 평가할 수 있습니다.

**Topics**
+ [Hive 테이블을 설정하여 Hive 명령 실행](EMR_Interactive_Hive.md)
+ [DynamoDB에서 데이터 내보내기, 가져오기 및 쿼리를 위한 Hive 명령 예제](EMR_Hive_Commands.md)
+ [DynamoDB에서 Amazon EMR 작업 성능 최적화](EMR_Hive_Optimizing.md)

# Hive 테이블을 설정하여 Hive 명령 실행
<a name="EMR_Interactive_Hive"></a>

데이터 웨어하우스 애플리케이션인 Apache Hive는 유사 SQL 언어를 사용하여 Amazon EMR 클러스터에 포함된 데이터를 쿼리하는 데 사용할 수 있습니다. Hive에 대한 자세한 내용은 [http://hive.apache.org/](http://hive.apache.org/)를 참조하세요.

다음 절차에서는 이미 클러스터를 생성하고 Amazon EC2 키 페어를 지정했다고 가정합니다. 클러스터 생성을 시작하는 방법에 대한 자세한 내용은* Amazon EMR 관리 안내서*에서 [Amazon EMR 시작하기](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs)를 참조하세요.

## MapReduce를 사용하도록 Hive 구성
<a name="hive-mapreduce"></a>

DynamoDB 테이블을 쿼리하기 위해 Amazon EMR에서 Hive를 사용하는 경우 Hive에 기본 실행 엔진인 Tez가 사용된다면 오류가 발생할 수 있습니다. 이런 이유로 이 섹션에서 설명한 대로 DynamoDB와 통합되는 Hive를 사용해 클러스터를 생성하는 경우 MapReduce를 사용하도록 Hive를 설정하는 구성 분류를 사용하는 것이 좋습니다. 자세한 내용은 [애플리케이션 구성](emr-configure-apps.md) 단원을 참조하십시오.

다음 조각은 MapReduce를 Hive용 실행 엔진으로 설정하기 위해 사용하는 구성 분류 및 속성을 보여줍니다.

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**Hive 명령을 대화식으로 실행하려면**

1. 마스터 노드에 연결합니다. 자세한 내용은 *Amazon EMR 관리 안내서*에서 [SSH를 사용하여 프라이머리 노드에 연결](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)을 참조하세요.

1. 현재 마스터 노드에 대한 명령 프롬프트에 `hive`를 입력합니다.

   Hive 프롬프트 `hive>`가 표시됩니다.

1.  Hive 애플리케이션의 테이블을 DynamoDB의 데이터에 매핑하는 Hive 명령을 입력합니다. 이 테이블은 Amazon DynamoDB에 저장된 데이터에 대한 참조로 사용되며, 데이터는 Hive에 로컬로 저장되지 않습니다. 따라서 이 테이블을 사용하는 모든 쿼리는 DynamoDB의 실시간 데이터에 대해 실행되므로 명령을 실행할 때마다 테이블의 읽기 또는 쓰기 용량이 소비됩니다. 동일한 데이터 세트에 대해 여러 Hive 명령을 실행하려면 먼저 해당 데이터 세트를 내보내는 것을 고려해 보세요.

    다음은 Hive 테이블을 DynamoDB 테이블에 매핑하는 구문을 보여 줍니다.

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    DynamoDB에서 Hive에 테이블을 생성하는 경우 `EXTERNAL` 키워드를 사용하여 외부 테이블로 생성해야 합니다. 외부 테이블과 내부 테이블의 차이점은 내부 테이블을 삭제하면 내부 테이블의 데이터가 삭제된다는 점입니다. 이는 Amazon DynamoDB에 연결된 경우에 바람직한 동작이 아니므로 외부 테이블만 지원됩니다.

    예를 들어, 다음 Hive 명령은 Hive에서 DynamoDB 테이블* dynamodbtable1*을 참조하는 *hivetable1* 테이블을 생성합니다. DynamoDB 테이블 *dynamodbtable1*에는 해시 및 범위 프라이머리 키 스키마가 있습니다. 해시 키 요소는 `name`(문자열 형식)이고, 범위 키 요소는 `year`(숫자 형식)이고, 각 항목에는 `holidays`(문자열 세트 형식)에 대한 속성 값이 있습니다.

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    라인 1에서는 HiveQL `CREATE EXTERNAL TABLE` 문을 사용합니다. *hivetable1*에 대해 DynamoDB 테이블의 각 속성 이름 및 값 페어에 대한 열을 설정하고 데이터 형식을 제공해야 합니다. 이러한 값은 대소문자를 구분하지 않으므로 열에 예약어를 제외한 모든 이름을 지정할 수 있습니다.

    라인 2에서는 `STORED BY` 문을 사용합니다. `STORED BY`의 값은 Hive와 DynamoDB 사이의 연결을 처리하는 클래스의 이름입니다. `'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'`로 설정되어야 합니다.

    행 3에서는 `TBLPROPERTIES` 문을 사용하여 'hivetable1'을 DynamoDB의 올바른 테이블 및 스키마에 연결합니다. `TBLPROPERTIES`에 `dynamodb.table.name` 및 `dynamodb.column.mapping` 파라미터의 값을 제공합니다. 이러한 값은 대소문자를 *구분합니다*.
**참고**  
 테이블에 대한 모든 DynamoDB 속성 이름은 Hive 테이블에 해당 열이 있어야 합니다. Amazon EMR 버전에 따라 다음 시나리오는 일대일 매핑이 존재하지 않는 경우 발생합니다.  
Amazon EMR 5.27.0 이상 버전의 경우, 커넥터에는 DynamoDB 속성 이름과 Hive 테이블에 있는 열 간의 일대일 매핑을 확인하는 검증이 포함되어 있습니다. 일대일 매핑이 존재하지 않으면 오류가 발생합니다.
Amazon EMR 5.26.0 이하 버전의 경우, Hive 테이블에는 DynamoDB의 이름 및 값 페어가 포함되지 않습니다. DynamoDB 프라이머리 키 속성을 매핑하지 않을 경우 Hive에서 오류가 발생합니다. 비기본 키 속성을 매핑하지 않는 경우에는 오류가 발생하지 않지만 Hive 테이블에 데이터가 표시되지 않습니다. 데이터 형식이 일치하지 않으면 값은 널입니다.

그런 다음 *hivetable1*에서 Hive 작업을 실행할 수 있습니다. *hivetable1*에 대해 실행되는 쿼리는 DynamoDB 계정의 DynamoDB 테이블 *dynamodbtable1*에 대해 내부적으로 실행되며, 실행할 때마다 읽기 또는 쓰기 유닛을 소비합니다.

DynamoDB 테이블에 대해 Hive 쿼리를 실행하는 경우 충분한 양의 읽기 용량 유닛을 프로비저닝했는지 확인해야 합니다.

예를 들어, DynamoDB 테이블에 읽기 용량 단위를 100개 프로비저닝하였다고 가정하겠습니다. 그러면 초당 100회의 읽기(또는 409,600바이트)를 수행할 수 있습니다. 테이블에 20GB의 데이터(21,474,836,480바이트)가 포함되어 있고 Hive 쿼리에서 전체 테이블 스캔을 수행하는 경우 쿼리를 실행하는 데 걸리는 시간을 추산할 수 있습니다.

 * 21,474,836,480 / 409,600 = 52,429초 = 14.56시간 * 

필요한 시간을 줄일 수 있는 유일한 방법은 원본 DynamoDB 테이블에서 읽기 용량 유닛을 조정하는 것입니다. Amazon EMR 노드를 더 추가하더라도 효과가 없습니다.

Hive 출력에서 하나 이상의 매퍼 프로세스가 완료되면 진행률이 업데이트됩니다. 하지만 대용량 DynamoDB 테이블에서 프로비저닝된 읽기 용량이 낮게 설정된 경우에는 진행률 출력이 오랫동안 업데이트되지 않을 수도 있습니다. 위 경우에는 작업 진행률이 몇 시간 0%로 표시됩니다. 작업 진행률에 대한 자세한 상태를 보려면 Amazon EMR 콘솔로 이동하여 개별 매퍼 작업 상태와 데이터 읽기에 대한 통계를 볼 수 있습니다. 마스터 노드에서 Hadoop 인터페이스에 로그온하여 Hadoop 통계를 볼 수도 있습니다. 여기에는 개별 맵 작업 상태와 일부 데이터 읽기 통계가 표시됩니다. 자세한 내용은 다음 항목을 참조하세요.
+ [프라이머리 노드에 호스팅된 웹 인터페이스](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [Hadoop 웹 인터페이스 보기](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

DynamoDB에서 데이터 내보내기 또는 가져오기 및 테이블 조인과 같은 작업을 수행하는 샘플 HiveQL 문에 대한 자세한 내용은 [DynamoDB에서 데이터 내보내기, 가져오기 및 쿼리를 위한 Hive 명령 예제](EMR_Hive_Commands.md) 섹션을 참조하세요.<a name="EMR_Hive_Cancel"></a>

**Hive 요청을 취소하려면**

Hive 쿼리를 실행할 때 서버의 초기 응답에는 요청을 취소하는 명령이 포함되어 있습니다. 프로세스 중에 언제든지 요청을 취소하려면 서버 응답에서 **Kill Command(Kill 명령)**를 사용합니다.

1. 명령줄 클라이언트를 종료하려면 `Ctrl+C`를 입력합니다.

1.  셸 프롬프트에서 요청에 대한 초기 서버 응답의 **Kill Command(Kill 명령)**를 입력합니다.

    또는 마스터 노드의 명령줄에서 다음 명령을 실행하여 Hadoop 작업을 종료할 수 있습니다. 여기서 *job-id*는 Hadoop 작업의 식별자이며 Hadoop 사용자 인터페이스에서 검색할 수 있습니다.

   ```
   hadoop job -kill job-id
   ```

## Hive 및 DynamoDB에 대한 데이터 유형
<a name="EMR_Hive_Properties"></a>

다음 테이블에는 사용 가능한 Hive 데이터 유형, 해당하는 기본 DynamoDB 유형, 매핑할 수 있는 대체 DynamoDB 유형이 나와 있습니다.


| Hive 형식 | 기본 DynamoDB 유형 | 대체 DynamoDB 유형 | 
| --- | --- | --- | 
| 문자열 | 문자열(S) |  | 
| bigint 또는 double | 숫자(N) |  | 
| 이진수 | 이진수(B) |  | 
| 부울 | BOOLEAN (BOOL) |  | 
| 배열 | list (L) | 숫자 세트(NS), 문자열 세트(SS) 또는 이진수 세트(BS) | 
| map<string,string> | item | map (M) | 
| map<string,?> | map (M) |  | 
|  | null (NULL) |  | 

Hive 데이터를 해당하는 대체 DynamoDB 유형으로 쓰거나 DynamoDB 데이터에 대체 DynamoDB 유형의 대체값이 포함된 경우 `dynamodb.type.mapping` 파라미터를 이용해 열 및 DynamoDB 유형을 지정할 수 있습니다. 다음 예제는 대체 형식 매핑을 지정하기 위한 구문을 보여줍니다.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

형식 매핑 파라미터는 선택 사항이며 대체 형식을 사용하는 열에 대해서만 지정해야 합니다.

예를 들어, 다음 Hive 명령은 DynamoDB 테이블 `dynamodbtable2`를 참조하는 `hivetable2`라는 테이블을 생성합니다. `col3` 열을 문자열 집합(SS) 형식으로 매핑하는 것을 제외하고는 `hivetable1`와 유사합니다.

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

Hive에서 `hivetable1` 및 `hivetable2`는 동일합니다. 하지만 이러한 테이블에 있는 데이터는 해당하는 DynamoDB 테이블에 기록되며, `dynamodbtable1`은 목록을 포함하고 `dynamodbtable2`는 문자열 세트를 포함합니다.

Hive `null` 값을 DynamoDB `null` 유형의 속성으로 작성하려는 경우 `dynamodb.null.serialization` 파라미터를 사용하면 됩니다. 다음 예제는 `null` 직렬화를 지정하기 위한 구문을 보여줍니다.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

Null 직렬화 파라미터는 선택 사항이며 지정하지 않으면 `false`로 설정됩니다. DynamoDB `null` 속성은 파라미터 설정에 상관없이 Hive의 `null` 값으로 읽힙니다. `null` 값을 가진 Hive 컬렉션은 null 직렬화 파라미터가 `true`로 설정된 경우에만 DynamoDB에 기록됩니다. 그렇지 않으면 Hive 오류가 발생합니다.

Hive의 bigint 형식은 Java long 형식과 동일하고, Hive double 형식은 정밀도의 측면에서 Java double 형식과 동일합니다. 즉, Hive 데이터 유형에서 사용할 수 있는 것보다 더 높은 정밀도를 가진 숫자 데이터가 DynamoDB에 저장되어 있는 경우 Hive를 사용하여 DynamoDB 데이터를 내보내거나, 가져오거나, 참조하면 정밀도가 손실되거나 Hive 쿼리 오류가 발생할 수 있습니다.

 DynamoDB에서 Amazon Simple Storage Service(S3) 또는 HDFS로 내보낸 이진수 유형은 Base64로 인코딩된 문자열로 저장됩니다. 데이터를 Amazon S3 또는 HDFS에서 DynamoDB 이진수 유형으로 가져올 경우 데이터를 Base64 문자열로 인코딩해야 합니다.

## Hive 옵션
<a name="EMR_Hive_Options"></a>

 다음 Hive 옵션을 설정하여 Amazon DynamoDB에서의 데이터 전송을 관리할 수 있습니다. 이러한 옵션은 현재 Hive 세션 동안만 지속됩니다. Hive 명령 프롬프트를 닫았다가 나중에 클러스터에서 다시 열면 해당 설정은 기본값으로 되돌아갑니다.


| Hive 옵션 | 설명 | 
| --- | --- | 
| dynamodb.throughput.read.percent |   DynamoDB 프로비저닝 처리 속도를 테이블에 할당된 범위로 유지하도록 읽기 작업 속도를 설정합니다. 값은 `0.1`과 `1.5` 사이입니다.  값 0.5는 기본 읽기 속도이며, 이 설정에서 Hive는 테이블에서 읽기 프로비저닝된 처리량 리소스의 1/2을 소비하려고 시도합니다. 이 값을 0.5보다 크게 설정하면 읽기 요청 속도가 빨라집니다. 이 값을 0.5보다 작게 설정하면 읽기 요청 속도가 느려집니다. 이 읽기 속도는 근사값입니다. 실제 읽기 속도는 DynamoDB의 키 분포가 균일한지 여부와 같은 요소에 따라 달라집니다.  Hive 작업에서 프로비저닝된 처리량을 자주 초과하거나 실시간 읽기 트래픽 병목 현상이 너무 자주 발생하는 경우 이 값을 `0.5`보다 작게 줄이세요. 용량이 충분하고 Hive 작업을 더 빠르게 수행하려면 이 값을 `0.5`보다 크게 설정하세요. 사용되지 않은 입력/출력 작업이 사용 가능한 경우 이 값을 최대 1.5까지 초과 설정할 수도 있습니다.  | 
| dynamodb.throughput.write.percent |   DynamoDB 프로비저닝 처리 속도를 테이블에 할당된 범위로 유지하도록 쓰기 작업 속도를 설정합니다. 값은 `0.1`과 `1.5` 사이입니다.  값 0.5는 기본 쓰기 속도이며, 이 설정에서 Hive는 테이블에서 쓰기 프로비저닝된 처리량 리소스의 1/2을 소비하려고 시도합니다. 이 값을 0.5보다 크게 설정하면 쓰기 요청 속도가 빨라집니다. 이 값을 0.5보다 작게 설정하면 쓰기 요청 속도가 느려집니다. 이 쓰기 속도는 근사값입니다. 실제 쓰기 속도는 DynamoDB의 키 분포가 균일한지 여부와 같은 요소에 따라 달라집니다.  Hive 작업에서 프로비저닝된 처리량을 자주 초과하거나 실시간 쓰기 트래픽 병목 현상이 너무 자주 발생하는 경우 이 값을 `0.5`보다 작게 줄이세요. 용량이 충분하고 Hive 작업을 더 빠르게 수행하려면 이 값을 `0.5`보다 크게 설정하세요. 사용되지 않은 입력/출력 작업이 사용 가능한 경우나 테이블에 데이터를 처음으로 업로드하고 아직 실시간 트래픽이 없는 경우 이 값을 최대 1.5까지 초과 설정할 수도 있습니다.  | 
| dynamodb.endpoint | DynamoDB 서비스에 대한 엔드포인트를 지정합니다. 사용 가능한 DynamoDB 엔드포인트에 대한 자세한 내용은 [리전 및 엔드포인트](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region)를 참조하세요.  | 
| dynamodb.max.map.tasks |   DynamoDB에서 데이터를 읽을 때 최대 맵 작업 수를 지정합니다. 이 값은 1보다 크거나 같아야 합니다.  | 
| dynamodb.retry.duration |   Hive 명령 재시도를 위한 제한 시간으로 사용할 시간(분)을 지정합니다. 이 값은 0보다 크거나 같은 정수입니다. 기본 제한 시간은 1분입니다.  | 

 다음 예제에 표시된 것처럼 이러한 값은 `SET` 명령을 사용하여 설정합니다.

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# DynamoDB에서 데이터 내보내기, 가져오기 및 쿼리를 위한 Hive 명령 예제
<a name="EMR_Hive_Commands"></a>

다음 예제에서는 Hive 명령을 사용하여 Amazon S3 또는 HDFS로 데이터 내보내기, DynamoDB로 데이터 가져오기, 테이블 조인, 테이블 쿼리 등의 작업을 수행합니다.

Hive 테이블에서의 작업은 DynamoDB에 저장된 데이터를 참조합니다. Hive 명령에는 DynamoDB 테이블의 프로비저닝된 처리량 설정이 적용되며 가져오는 데이터에는 DynamoDB에서 Hive 작업 요청을 처리할 때 DynamoDB 테이블에 쓰여진 데이터가 포함됩니다. 데이터 검색 프로세스가 오래 걸리면 Hive 명령 시작 이후 Hive 명령이 반환하는 일부 데이터가 DynamoDB에서 업데이트된 것일 수 있습니다.

Hive 명령 `DROP TABLE` 및 `CREATE TABLE`은 Hive의 로컬 테이블에만 적용되며 DynamoDB에서 테이블을 생성하거나 삭제하지 않습니다. Hive 쿼리가 DynamoDB의 테이블을 참조할 경우 쿼리를 실행하기 전에 해당 테이블이 이미 있어야 합니다. DynamoDB에서 테이블을 생성하고 삭제하는 방법에 대한 자세한 내용은 *Amazon DynamoDB 개발자 안내서*에서 [DynamoDB의 테이블 작업](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html)을 참조하세요.

**참고**  
 Hive 테이블을 Amazon S3의 위치에 매핑하는 경우 버킷의 루트 경로인 s3://amzn-s3-demo-bucket에 매핑하지 않습니다. 이 경우 Hive에서 Amazon S3에 데이터를 쓸 때 오류가 발생할 수 있습니다. 대신, 버킷의 하위 경로인 s3://amzn-s3-demo-bucket/mypath에 테이블을 매핑합니다.

## DynamoDB에서 데이터 내보내기
<a name="EMR_Hive_Commands_exporting"></a>

 Hive를 사용하여 DynamoDB에서 데이터를 내보낼 수 있습니다.

**Amazon S3 버킷으로 DynamoDB 테이블을 내보내는 방법**
+  DynamoDB에 저장된 데이터를 참조하는 Hive 테이블을 생성합니다. 그러면 INSERT OVERWRITE 명령을 호출하여 외부 디렉터리에 데이터를 쓸 수 있습니다. 다음 예제에서 *s3://amzn-s3-demo-bucket/path/subpath/*는 Amazon S3의 유효한 경로입니다. DynamoDB의 값에 맞게 CREATE 명령에서 열과 데이터 형식을 조정합니다. 이 항목을 사용하여 Amazon S3에서 DynamoDB 데이터의 아카이브를 생성할 수 있습니다.

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**형식을 사용하여 DynamoDB 테이블을 Amazon S3 버킷으로 내보내는 방법**
+  Amazon S3 내 위치를 참조하는 외부 테이블을 생성합니다. 이 작업은 s3\$1export로 아래에 표시됩니다. CREATE 호출 중에 테이블의 행 형식을 지정합니다. 그러면 INSERT OVERWRITE를 사용하여 DynamoDB에서 s3\$1export로 데이터를 내보낼 때 지정된 형식으로 데이터가 쓰여집니다. 다음 예제에서는 쉼표로 구분된 값(CSV)으로 데이터가 쓰여집니다.

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**열 매핑을 지정하지 않고 DynamoDB 테이블을 Amazon S3 버킷으로 내보내는 방법**
+  DynamoDB에 저장된 데이터를 참조하는 Hive 테이블을 생성합니다. 열 매핑을 지정하지 않는다는 점을 제외하면 앞의 예제와 비슷합니다. 테이블에는 `map<string, string>` 형식의 열 1개만 있어야 합니다. Amazon S3에 `EXTERNAL` 테이블을 생성하면 `INSERT OVERWRITE` 명령을 직접 호출하여 DynamoDB에서 에 데이터를 쓸 수 있습니다. 이 항목을 사용하여 Amazon S3에서 DynamoDB 데이터의 아카이브를 생성할 수 있습니다. 열 매핑이 없으므로 이 방법으로 내보낸 테이블은 쿼리할 수 없습니다. Amazon EMR AMI 2.2.*x* 이상에서 지원되는 Hive 0.8.1.5 이상에서는 열 매핑을 지정하지 않고 데이터를 내보낼 수 있습니다.

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**데이터 압축을 사용하여 DynamoDB 테이블을 Amazon S3 버킷으로 내보내는 방법**
+  Hive는 Hive 세션 중에 설정할 수 있는 여러 압축 코덱을 제공합니다. 그러면 내보낸 데이터가 지정된 형식으로 압축됩니다. 다음 예제에서는 Lempel-Ziv-Oberhumer(LZO) 알고리즘을 사용하여 내보낸 파일을 압축합니다.

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   다음과 같은 압축 코덱을 사용할 수 있습니다.
  +  org.apache.hadoop.io.compress.GzipCodec 
  +  org.apache.hadoop.io.compress.DefaultCodec 
  +  com.hadoop.compression.lzo.LzoCodec 
  +  com.hadoop.compression.lzo.LzopCodec 
  +  org.apache.hadoop.io.compress.BZip2Codec 
  +  org.apache.hadoop.io.compress.SnappyCodec 

**DynamoDB 테이블을 HDFS로 내보내는 방법**
+  다음 Hive 명령을 사용합니다. 여기서 *hdfs:///directoryName*은 유효한 HDFS 경로이고 *hiveTableName*은 DynamoDB를 참조하는 Hive의 테이블입니다. Amazon S3로 데이터를 내보낼 때 Hive 0.7.1.1에서 HDFS를 중간 단계로 사용하므로 이 내보내기 작업은 DynamoDB 테이블을 Amazon S3로 내보내는 것보다 빠릅니다. 또한 다음 예제에서는 `dynamodb.throughput.read.percent`를 1.0으로 설정하여 읽기 요청 빈도를 높이는 방법을 보여 줍니다.

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   또한 Amazon S3로 내보내기 위해 위에 나온 대로 형식 및 압축을 사용하여 HDFS로 데이터를 내보낼 수 있습니다. 그러려면 위 예제의 Amazon S3 디렉터리를 HDFS 디렉터리로 바꾸기만 하면 됩니다.<a name="EMR_Hive_non-printable-utf8"></a>

**인쇄할 수 없는 UTF-8 문자 데이터를 Hive로 읽으려면**
+ 테이블을 생성할 때 `STORED AS SEQUENCEFILE` 절을 사용하여 인쇄할 수 없는 UTF-8 문자를 Hive로 읽고 쓸 수 있습니다. SequenceFile은 Hadoop 이진 파일 형식입니다. 이 파일을 읽으려면 Hadoop을 사용해야 합니다. 다음 예제에서는 DynamoDB에서 Amazon S3로 데이터를 내보내는 방법을 보여 줍니다. 이 기능을 사용하여 인쇄할 수 없는 UTF-8 인코딩 문자를 처리할 수 있습니다.

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## DynamoDB로 데이터 가져오기
<a name="EMR_Hive_Commands_importing"></a>

 Hive를 사용하여 DynamoDB에 데이터를 쓸 때 쓰기 용량 유닛 수가 클러스터의 매퍼 수보다 큰지 확인해야 합니다. 예를 들어, m1.xlarge EC2 인스턴스에서 실행되는 클러스터는 인스턴스당 8개의 매퍼를 생성합니다. 클러스터의 인스턴스가 10개일 경우 매퍼는 총 80개입니다. 쓰기 용량 단위가 클러스터의 매퍼 수보다 크지 않으면 Hive 쓰기 작업이 쓰기 처리량을 모두 소비하거나 프로비저닝된 것보다 많은 처리량을 소비하려고 합니다. 각 EC2 인스턴스 유형에서 생성되는 매퍼 수에 대한 자세한 내용은 [Hadoop 구성](emr-hadoop-config.md) 섹션을 참조하세요.

 Hadoop의 매퍼 수는 입력 분할로 제어됩니다. 분할이 너무 적으면 쓰기 명령이 사용 가능한 쓰기 처리량을 모두 사용하지 못할 수도 있습니다.

 동일한 키를 가진 항목이 대상 DynamoDB 테이블에 있으면 이 항목을 덮어씁니다. 해당 키를 가진 항목이 대상 DynamoDB 테이블에 없으면 항목이 삽입됩니다.

**Amazon S3에서 DynamoDB로 데이터를 가져오는 방법**
+  Amazon EMR(Amazon EMR) 및 Hive를 사용하여 Amazon S3에서 DynamoDB에 데이터를 쓸 수 있습니다.

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**열 매핑을 지정하지 않고 Amazon S3 버킷에서 DynamoDB로 테이블을 가져오는 방법**
+  전에 DynamoDB에서 내보내어 Amazon S3에 저장된 데이터를 참조하는 `EXTERNAL` 테이블을 생성합니다. 가져오기 전에 테이블이 DynamoDB에 있으며 전에 내보낸 DynamoDB 테이블과 동일한 키 스키마를 가지고 있는지 확인합니다. 또한 테이블에는 `map<string, string>` 형식의 열 1개만 있어야 합니다. DynamoDB에 연결된 Hive 테이블을 생성하면 `INSERT OVERWRITE` 명령을 호출하여 Amazon S3에서 DynamoDB에 데이터를 쓸 수 있습니다. 열 매핑이 없으므로 이 방법으로 가져온 테이블은 쿼리할 수 없습니다. Amazon EMR AMI 2.2.3 이상에서 지원되는 Hive 0.8.1.5 이상에서는 열 매핑을 지정하지 않고 데이터를 가져올 수 있습니다.

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**HDFS에서 DynamoDB로 테이블을 가져오는 방법**
+  Amazon EMR 및 Hive를 사용하여 HDFS에서 DynamoDB에 데이터를 쓸 수 있습니다.

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## DynamoDB의 데이터 쿼리
<a name="EMR_Hive_Commands_querying"></a>

 다음 예제에서는 Amazon EMR을 사용하여 DynamoDB에 저장된 데이터를 쿼리할 수 있는 여러 가지 방법을 보여줍니다.

**매핑된 열의 가장 큰 값을 찾으려면(`max`)**
+  다음과 같은 Hive 명령을 사용합니다. 첫 번째 명령에서 CREATE 문은 DynamoDB에 저장된 데이터를 참조하는 Hive 테이블을 생성합니다. 그러면 SELECT 문이 해당 테이블을 사용하여 DynamoDB에 저장된 데이터를 쿼리합니다. 다음 예제에서는 제공된 고객이 한 가장 큰 주문을 찾습니다.

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**`GROUP BY` 절을 사용하여 데이터를 집계하려면**
+  `GROUP BY` 절을 사용하여 여러 레코드에서 데이터를 수집할 수 있습니다. 이 절은 종종 sum, count, min 또는 max와 같은 집계 함수와 함께 사용됩니다. 다음 예제에서는 4번 이상 주문한 고객의 가장 큰 주문 목록을 반환합니다.

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**두 DynamoDB 테이블을 조인하는 방법**
+  다음 예제에서는 Hive 테이블 2개를 DynamoDB에 저장된 데이터에 매핑합니다. 그러면 두 테이블에 조인이 호출됩니다. 조인은 클러스터에서 계산되어 반환됩니다. DynamoDB에서는 조인이 일어나지 않습니다. 이 예제에서는 고객 및 3번 이상 주문한 고객의 구매 목록이 반환합니다.

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**서로 다른 소스의 두 테이블을 조인하려면**
+  다음 예제에서 Customer\$1S3는 Amazon S3에 저장된 CSV 파일을 로드하는 Hive 테이블이며, hive\$1purchases는 DynamoDB의 데이터를 참조하는 테이블입니다. 다음 예제에서는 Amazon S3에 CSV 파일로 저장된 고객 데이터를 DynamoDB에 저장된 주문 데이터와 조인하여 이름에 'Miller'가 포함된 고객이 한 주문을 나타내는 데이터 세트를 반환합니다.

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**참고**  
 위의 예제에서 명확성과 완성도를 위해 CREATE TABLE 문이 각 예제에 포함되었습니다. 제공된 Hive 테이블에 대해 여러 쿼리나 내보내기 작업을 실행할 때는 Hive 세션을 시작할 때 한 번만 테이블을 생성해야 합니다.

# DynamoDB에서 Amazon EMR 작업 성능 최적화
<a name="EMR_Hive_Optimizing"></a>

 DynamoDB 테이블에서 Amazon EMR 작업은 읽기 작업으로 간주되어 테이블의 프로비저닝 처리량 설정이 적용됩니다. Amazon EMR은 프로비저닝된 처리량을 초과할 가능성을 최소화하기 위해 DynamoDB 테이블의 부하를 균형 있게 분산시키려는 자체 로직을 구현합니다. 각 Hive 쿼리가 끝날 때마다 Amazon EMR이 프로비저닝 처리량의 초과 횟수를 포함해 쿼리 처리에 사용된 클러스터 정보를 반환합니다. 이 정보는 DynamoDB 처리량에 대한 CloudWatch 지표와 함께 후속 요청에서 DynamoDB 테이블에 대한 로드를 효과적으로 관리하는 데 사용됩니다.

 다음은 DynamoDB 테이블 작업 시 Hive 쿼리 성능에 영향을 미치는 요인입니다.

## 프로비저닝된 읽기 용량 유닛
<a name="ProvisionedReadCapacityUnits"></a>

 DynamoDB 테이블에 대해 Hive 쿼리를 실행하는 경우 충분한 양의 읽기 용량 유닛을 프로비저닝했는지 확인해야 합니다.

 예를 들어, DynamoDB 테이블에 읽기 용량 단위를 100개 프로비저닝하였다고 가정하겠습니다. 그러면 초당 100회의 읽기(또는 409,600바이트)를 수행할 수 있습니다. 테이블에 20GB의 데이터(21,474,836,480바이트)가 포함되어 있고 Hive 쿼리에서 전체 테이블 스캔을 수행하는 경우 쿼리를 실행하는 데 걸리는 시간을 추산할 수 있습니다.

 * 21,474,836,480 / 409,600 = 52,429초 = 14.56시간 * 

 필요한 시간을 줄일 수 있는 유일한 방법은 원본 DynamoDB 테이블에서 읽기 용량 유닛을 조정하는 것입니다. 노드를 Amazon EMR 클러스터에 더 추가하더라도 효과가 없습니다.

 Hive 출력에서 하나 이상의 매퍼 프로세스가 완료되면 진행률이 업데이트됩니다. 하지만 대용량 DynamoDB 테이블에서 프로비저닝된 읽기 용량이 낮게 설정된 경우에는 진행률 출력이 오랫동안 업데이트되지 않을 수도 있습니다. 위 경우에는 작업 진행률이 몇 시간 0%로 표시됩니다. 작업 진행률에 대한 자세한 상태를 보려면 Amazon EMR 콘솔로 이동하여 개별 매퍼 작업 상태와 데이터 읽기에 대한 통계를 볼 수 있습니다.

 마스터 노드에서 Hadoop 인터페이스에 로그온하여 Hadoop 통계를 볼 수도 있습니다. 여기에는 개별 맵 작업 상태와 일부 데이터 읽기 통계가 표시됩니다. 자세한 내용은 *Amazon EMR 관리 안내서*에서 [프라이머리 노드에 호스팅된 웹 인터페이스](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)를 참조하세요.

## 읽기 비율 설정
<a name="ReadPercent"></a>

 기본적으로 Amazon EMR은 현재 프로비저닝된 처리량에 따라 DynamoDB 테이블에 대한 요청 로드를 관리합니다. 하지만 Amazon EMR이 반환하는 작업 정보에 프로비저닝된 처리량을 초과하는 높은 응답 수가 포함된 경우 Hive 테이블을 설정하면서 `dynamodb.throughput.read.percent` 파라미터를 사용해 기본 읽기 비율을 조정할 수 있습니다. 읽기 비율 파라미터 설정에 대한 자세한 내용은 [Hive 옵션](EMR_Interactive_Hive.md#EMR_Hive_Options) 섹션을 참조하세요.

## 쓰기 비율 설정
<a name="WritePercent"></a>

 기본적으로 Amazon EMR은 현재 프로비저닝된 처리량에 따라 DynamoDB 테이블에 대한 요청 로드를 관리합니다. 하지만 Amazon EMR이 반환하는 작업 정보에 프로비저닝된 처리량을 초과하는 높은 응답 수가 포함된 경우 Hive 테이블을 설정하면서 `dynamodb.throughput.write.percent` 파라미터를 사용해 기본 쓰기 비율을 조정할 수 있습니다. 쓰기 비율 파라미터 설정에 대한 자세한 내용은 [Hive 옵션](EMR_Interactive_Hive.md#EMR_Hive_Options) 섹션을 참조하세요.

## 재시도 지속 시간 설정
<a name="emr-ddb-retry-duration"></a>

 기본적으로 Amazon EMR은 기본 재시도 주기인 2분 내에 결과를 반환하지 않으면 Hive 쿼리를 다시 실행합니다. 이 주기는 Hive 쿼리 실행 시 `dynamodb.retry.duration` 파라미터를 설정하면 조정 가능합니다. 쓰기 비율 파라미터 설정에 대한 자세한 내용은 [Hive 옵션](EMR_Interactive_Hive.md#EMR_Hive_Options) 섹션을 참조하세요.

## 맵 작업 수
<a name="NumberMapTasks"></a>

 DynamoDB에 저장된 데이터 내보내기 및 쿼리 요청을 처리할 목적으로 Hadoop에서 실행하는 매퍼 대몬(daemon)은 최대 읽기 비율이 초당 1MiB이기 때문에 사용할 수 있는 읽기 용량도 제한됩니다. 이때 DynamoDB에서 사용할 수 있는 프로비저닝된 처리량을 추가하면 매퍼 대몬(daemon) 수를 늘려 Hive 내보내기 및 쿼리 작업 성능을 높일 수 있습니다. 이를 위해 클러스터에 속한 EC2 인스턴스 수를 늘릴 수 있습니다. *또는* 각 EC2 인스턴스에서 실행되는 매퍼 대몬(daemon) 수를 늘릴 수 있습니다.

 클러스터의 EC2 인스턴스 수를 늘리려면 현재 클러스터를 중단하고 EC2 인스턴스 수를 늘려 재실행하면 됩니다. Amazon EMR 콘솔을 사용해 클러스터를 시작한 경우에는 **EC2 인스턴스 구성** 대화 상자에서 EC2 인스턴스 수를 지정합니다. 또는 CLI에서 클러스터를 시작한 경우에는 `‑‑num-instances` 옵션을 통해 EC2 인스턴스 수를 지정합니다.

 인스턴스에서 실행되는 맵 작업 수는 EC2 인스턴스 유형에 따라 다릅니다. 지원되는 EC2 인스턴스 유형과 각 인스턴스가 제공하는 매퍼 수에 대한 자세한 내용은 [작업 구성](emr-hadoop-task-config.md) 섹션을 참조하세요. 여기에서 "작업 구성(Task Configuration)" 섹션을 보면 각각 지원되는 구성을 알 수 있습니다.

 매퍼 대몬의 수를 늘리는 또 한 가지 방법은 Hadoop의 `mapreduce.tasktracker.map.tasks.maximum` 구성 파라미터 값을 높이는 것입니다. 이 방법은 EC2 인스턴스 수나 크기를 늘리지 않고 매퍼를 추가함으로써 비용을 절감한다는 점에서 이점이 있습니다. 반면에 이 값을 너무 높게 설정하면 클러스터에 속한 EC2 인스턴스의 메모리가 부족할 수도 있다는 것이 단점입니다. `mapreduce.tasktracker.map.tasks.maximum`을 설정하려면 클러스터를 시작하고 mapred-site 구성 분류의 속성으로 `mapreduce.tasktracker.map.tasks.maximum`의 값을 지정합니다. 방법은 다음 예제와 같습니다. 자세한 내용은 [애플리케이션 구성](emr-configure-apps.md) 단원을 참조하십시오.

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## 병렬 데이터 요청
<a name="ParallelDataRequests"></a>

 한 명 이상의 사용자 또는 하나 이상의 애플리케이션에서 단일 테이블로 데이터 요청이 다수 이루어지면 읽기 프로비저닝 처리량이 한 번에 소모되어 성능이 느려질 수 있습니다.

## 프로세스 기간
<a name="ProcessDuration"></a>

 DynamoDB의 데이터 일관성은 각 노드의 읽기 및 쓰기 작업 순서에 따라 달라집니다. Hive 쿼리가 진행 중일 때는 다른 애플리케이션이 새로운 데이터를 DynamoDB 테이블에 로드하거나 기존 데이터를 변경 또는 삭제하기도 합니다. 이 경우 쿼리 실행 중 데이터 변경 사항은 Hive 쿼리 결과에 반영되지 않습니다.

## 처리량 초과 주의
<a name="AvoidExceedingThroughput"></a>

 DynamoDB에 대해 Hive 쿼리를 실행할 때는 프로비저닝된 처리량을 초과하지 않도록 주의해야 합니다. 그렇지 않으면 애플리케이션이 `DynamoDB::Get`을 직접 호출하는 데 필요한 용량이 고갈되기 때문입니다. 이러한 용량 고갈을 방지하려면 Amazon CloudWatch의 로그를 점검하고 메트릭을 관찰하여 `DynamoDB::Get` 애플리케이션 호출에 따른 읽기 볼륨 및 병목 현상을 정기적으로 모니터링해야 합니다.

## 요청 시간
<a name="RequestTime"></a>

 DynamoDB 테이블에 대한 수요가 낮은 시간에 Hive 쿼리가 DynamoDB 테이블에 액세스하도록 일정을 조정하면 성능이 향상됩니다. 예를 들어, 애플리케이션 사용자 대부분이 샌프란시스코에 거주한다면 대다수가 잠드는 시간인 새벽 4시(PST)에 DynamoDB 데이터베이스의 레코드 업데이트 없이 일일 데이터를 내보내도록 선택할 수 있습니다.

## 시간 기반 테이블
<a name="TimeBasedTables"></a>

 데이터가 1일 1개 테이블처럼 시간 기반 DynamoDB 테이블의 형태로 구성되는 경우에는 테이블이 활성 상태가 아닐 때 데이터를 내보낼 수 있습니다. 이 기술은 데이터를 Amazon S3에 지속적으로 백업할 때 사용됩니다.

## 아카이브된 데이터
<a name="ArchivedData"></a>

 DynamoDB에 저장된 데이터에 대해 Hive 쿼리를 여러 차례 실행할 계획이면서 애플리케이션이 데이터 아카이브를 허용하는 경우에는 데이터를 HDFS 또는 Amazon S3로 내보내서 DynamoDB가 아닌 데이터 사본에 대해 Hive 쿼리를 실행할 수 있습니다. 그러면 읽기 작업과 프로비저닝 처리량이 절약됩니다.

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR 클러스터는 Hive, Pig, MapReduce, Hadoop Streaming API, Cascading 등 Hadoop 에코시스템에서 익숙한 도구를 사용해 Amazon Kinesis 스트림을 직접 읽고 처리할 수 있습니다. 실행 중인 클러스터에서 Amazon S3, Amazon DynamoDB 및 HDFS의 기존 데이터와 Amazon Kinesis의 실시간 데이터를 조인할 수도 있습니다. 사후 처리 활동을 위해 Amazon EMR에서 Amazon S3 또는 DynamoDB로 데이터를 직접 로드할 수 있습니다. Amazon Kinesis 서비스 주요 기능 및 요금에 대한 자세한 내용은 [Amazon Kinesis](https://aws.amazon.com//kinesis) 페이지를 참조하세요.

## Amazon EMR과 Amazon Kinesis 통합으로 어떤 작업을 수행할 수 있나요?
<a name="kinesis-use-cases"></a>

 다음과 같은 특정 시나리오는 Amazon EMR과 Amazon Kinesis를 통합할 경우 훨씬 쉬워집니다.
+ **스트리밍 로그 분석** - 스트리밍 웹 로그를 분석하여 리전, 브라우저 및 액세스 도메인에 따라 몇 분마다 상위 10개의 오류 유형 목록을 생성할 수 있습니다.
+ **고객 참여** - Amazon Kinesis의 클릭스트림 데이터를 DynamoDB 테이블에 저장된 광고 캠페인 정보와 조인하여 특정 웹 사이트에 표시되는 가장 효과적인 광고 카테고리를 식별하는 쿼리를 작성할 수 있습니다.
+ **애드혹 대화형 쿼리** - Amazon Kinesis 스트림에서 HDFS로 주기적으로 데이터를 로드하고 빠른 대화형 분석 쿼리를 위한 로컬 Impala 테이블로 데이터를 사용하게 할 수 있습니다.

## Amazon Kinesis 스트림의 분석 검사
<a name="kinesis-checkpoint"></a>

사용자는 Amazon Kinesis 스트림에 대해 *반복*이라고 하는 주기적인 배치 분석을 실행할 수 있습니다. Amazon Kinesis 스트림 데이터 레코드는 시퀀스 번호를 사용하여 검색되기 때문에 반복 경계는 Amazon EMR이 DynamoDB 테이블에 저장하는 시작 및 종료 시퀀스 번호로 정의됩니다. 예를 들어, `iteration0`이 끝나면 DynamoDB에 종료 시퀀스 번호를 저장하므로 `iteration1` 작업이 시작되면 스트림에서 후속 데이터를 검색할 수 있습니다. 스트림 데이터의 이 반복 매핑을 *검사*라고 합니다. 자세한 내용은 [Kinesis 커넥터](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)를 참조하세요.

반복을 검사했지만 반복을 처리하는 작업에 실패한 경우 Amazon EMR은 해당 반복의 레코드 재처리를 시도합니다.

검사는 다음을 수행할 수 있는 기능입니다.
+ 동일한 스트림 및 논리 이름에서 실행된 이전 쿼리에서 시퀀스 번호가 처리된 후에 데이터 처리를 시작합니다.
+ 이전 쿼리에서 처리한 Kinesis의 동일한 데이터 배치를 재처리합니다.

 검사를 활성화하려면 스크립트에서 `kinesis.checkpoint.enabled` 파라미터를 `true`로 설정합니다. 또한 다음 파라미터를 구성합니다.


| 구성 설정 | 설명 | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | 검사 정보가 저장되는 DynamoDB 테이블 이름 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB 테이블의 해시 키 이름 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB 테이블의 범위 키 이름 | 
| kinesis.checkpoint.logical.name | 현재 처리의 논리적 이름 | 
| kinesis.checkpoint.iteration.no | 논리적 이름과 연관된 처리의 반복 번호 | 
| kinesis.rerun.iteration.without.wait | 실패한 반복이 시간 초과를 기다리지 않고 다시 실행할 수 있는지 여부를 나타내는 부울 값입니다. 기본값은 false입니다. | 

### Amazon DynamoDB 테이블의 프로비저닝된 IOPS 권장 사항
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis용 Amazon EMR 커넥터는 메타데이터 검사를 위한 백업으로 DynamoDB 데이터베이스를 사용합니다. 검사 간격으로 Amazon EMR 클러스터가 포함된 Amazon Kinesis 스트림의 데이터를 사용하기 전에 DynamoDB에서 테이블을 생성해야 합니다. 테이블은 Amazon EMR 클러스터와 같은 리전에 있어야 합니다. 다음은 DynamoDB 테이블에 대해 프로비저닝해야 하는 IOPS 수에 대한 일반적인 권장사항입니다. `j`는 동시에 실행할 수 있는 Hadoop 작업의 최대 수(다른 논리적 이름\$1반복 수 조합 포함)이고 `s`는 작업이 처리하는 샤드의 최대 수입니다.

**읽기 용량 유닛**: `j`\$1`s`/`5`

**쓰기 용량 유닛**: `j`\$1`s`

## 성능 고려 사항
<a name="performance"></a>

Amazon Kinesis 샤드 처리량은 Amazon EMR 클러스터를 구성하는 노드의 인스턴스 크기 및 스트림의 레코드 크기에 정비례합니다. 프라이머리 및 코어 노드에는 m5.xlarge 또는 더 큰 인스턴스를 사용하는 것이 좋습니다.

## Amazon EMR을 사용하여 Amazon Kinesis 분석 예약
<a name="schedule"></a>

제한 시간과 최대 반복 기간 제한이 있는 활성 Amazon Kinesis 스트림에서 데이터를 분석하는 경우, 분석을 자주 실행하여 스트림에서 정기적으로 세부 정보를 수집해야 합니다. 이러한 스크립트와 쿼리를 주기적으로 실행하는 방법에는 여러 가지가 있지만 이처럼 반복적인 작업에는 AWS Data Pipeline 을 사용하는 것이 좋습니다. 자세한 내용은 *AWS Data Pipeline 개발자 안내서*에서 [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html) 및 [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)를 참조하세요.

# Spark Kinesis 커넥터를 Amazon EMR 7.0용 SDK 2.x로 마이그레이션
<a name="migrating-spark-kinesis"></a>

 AWS SDK는 자격 증명 관리, S3 및 Kinesis 서비스 연결과 같은 클라우드 컴퓨팅 서비스와 상호 작용할 수 있는 AWS 풍부한 APIs 및 라이브러리 세트를 제공합니다. Spark Kinesis 커넥터는 Kinesis Data Streams의 데이터를 소비하는 데 사용되며 수신된 데이터는 Spark의 실행 엔진에서 변환 및 처리됩니다. 현재이 커넥터는 1.x SDK AWS 및 Kinesis-client-library(KCL)를 기반으로 구축되었습니다.

 AWS SDK 2.x 마이그레이션의 일환으로 Spark Kinesis 커넥터도 SDK 2.x로 실행되도록 적절히 업데이트됩니다. Amazon EMR 7.0 릴리스의 Spark에는 Apache Spark의 커뮤니티 버전에서 아직 사용할 수 없는 SDK 2.x 업그레이드가 포함되어 있습니다. 7.0 미만의 릴리스에서 Spark Kinesis 커넥터를 사용하는 경우 Amazon EMR 7.0으로 마이그레이션하기 전에 SDK 2.x에서 실행되도록 애플리케이션 코드를 마이그레이션해야 합니다.

## 마이그레이션 안내서
<a name="migrating-spark-kinesis-migration-guides"></a>

이 섹션에서는 업그레이드된 Spark Kinesis 커넥터로 애플리케이션을 마이그레이션하는 단계를 설명합니다. 여기에는 AWS SDK 2.x의 Kinesis Client Library(KCL) 2.x, AWS 자격 증명 공급자 및 AWS 서비스 클라이언트로 마이그레이션하는 가이드가 포함되어 있습니다. 참고로 Kinesis 커넥터를 사용하는 샘플 [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) 프로그램도 포함되어 있습니다.

**Topics**
+ [1.x에서 2.x로 KCL 마이그레이션](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [AWS SDK 1.x에서 2.x로 AWS 자격 증명 공급자 마이그레이션](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [AWS SDK 1.x에서 2.x로 AWS 서비스 클라이언트 마이그레이션](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [스트리밍 애플리케이션의 코드 예제](#migrating-spark-kinesis-streaming-examples)
+ [업그레이드된 Spark Kinesis 커넥터 사용 시 고려 사항](#migrating-spark-kinesis-considerations)

### 1.x에서 2.x로 KCL 마이그레이션
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream`의 지표 수준 및 차원**

  `KinesisInputDStream`을 인스턴스화할 때 스트림의 지표 수준 및 차원을 제어할 수 있습니다. 다음 예제는 KCL 1.x로 이러한 파라미터를 사용자 지정하는 방법을 보여줍니다.

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  KCL 2.x에서는 이러한 구성 설정의 패키지 이름이 다릅니다. 2.x로 마이그레이션:

  1. 가져오기 명령문을 각각 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 및 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel`에서 `software.amazon.kinesis.metrics.MetricsLevel` 및 `software.amazon.kinesis.metrics.MetricsUtil`로 변경하세요.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` 줄을 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`로 바꿉니다.

  다음은 지표 수준 및 지표 차원을 사용자 지정한 `KinesisInputDStream`의 업데이트된 버전입니다.

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream`의 메시지 핸들러 함수

  파티션 키와 같이 레코드에 포함된 다른 데이터를 사용하려는 경우 `KinesisInputDStream`를 인스턴스화할 때 Kinesis Record를 사용하여 일반 객체 T를 반환하는 “메시지 핸들러 함수”를 제공할 수도 있습니다.

  KCL 1.x에서 메시지 핸들러 함수 서명은 `Record => T`이고. 여기서 Record는 `com.amazonaws.services.kinesis.model.Record`입니다. KCL 2.x에서는 핸들러의 서명이 `KinesisClientRecord => T`로 변경됩니다. 여기서 KinesisClientRecord는 `software.amazon.kinesis.retrieval.KinesisClientRecord`입니다.

  다음은 KCL 1.x에서 메시지 핸들러를 제공하는 예제입니다.

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  메시지 핸들러를 마이그레이션하는 방법:

  1. `com.amazonaws.services.kinesis.model.Record`의 가져오기 명령문을 `software.amazon.kinesis.retrieval.KinesisClientRecord`로 변경하세요.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 메시지 핸들러의 메서드 서명을 업데이트하세요.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  다음은 KCL 2.x에서 메시지 핸들러를 제공하는 업데이트된 예입니다.

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  KCL 1.x에서 2.x로 마이그레이션에 대한 자세한 내용은 [KCL 1.x에서 KCL 2.x로 소비자 마이그레이션](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)을 참조하세요.

### AWS SDK 1.x에서 2.x로 AWS 자격 증명 공급자 마이그레이션
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

자격 증명 공급자는와 상호 작용하기 위한 AWS 자격 증명을 얻는 데 사용됩니다 AWS. SDK 2.x의 보안 인증 공급자와 관련된 몇 가지 인터페이스 및 클래스 변경 사항은 [여기](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)에서 확인할 수 있습니다. Spark Kinesis 커넥터는 1.x 버전의 AWS 자격 증명 공급자를 반환하는 인터페이스(`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 및 구현 클래스를 정의했습니다. Kinesis 클라이언트를 초기화할 때 이러한 보안 인증 공급자가 필요합니다. 예를 들어 `SparkAWSCredentials.provider` 애플리케이션에서 메서드를 사용하는 경우 2.x 버전의 AWS 자격 증명 공급자를 사용하도록 코드를 업데이트해야 합니다.

다음은 AWS SDK 1.x에서 자격 증명 공급자를 사용하는 예입니다.

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**SDK 2.x로 마이그레이션하는 방법:**

1. `com.amazonaws.auth.AWSCredentialsProvider`의 가져오기 명령문을 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`로 변경

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 이 클래스를 사용하는 나머지 코드를 업데이트하세요.

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### AWS SDK 1.x에서 2.x로 AWS 서비스 클라이언트 마이그레이션
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 서비스 클라이언트는 2.x(예: `software.amazon.awssdk`)에서 패키지 이름이 다른 반면, SDK 1.x는를 사용합니다`com.amazonaws`. 클라이언트 변경에 대한 자세한 내용은 [여기](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)를 참조하세요. 코드에서 이러한 서비스 클라이언트를 사용하는 경우 그에 따라 클라이언트를 마이그레이션해야 합니다.

다음은 SDK 1.x에서 클라이언트를 생성하는 예제입니다.

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**2.x로 마이그레이션:**

1. 서비스 클라이언트의 가져오기 명령문을 변경하세요. DynamoDB 클라이언트를 예로 들어 보겠습니다. `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 또는 `com.amazonaws.services.dynamodbv2.document.DynamoDB`를 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`로 변경해야 할 것입니다.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 클라이언트를 초기화하는 코드를 업데이트하세요.

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

    AWS SDK를 1.x에서 2.x로 마이그레이션하는 방법에 대한 자세한 내용은 [AWS SDK for Java 1.x와 2.x의 차이점을 참조하세요.](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 스트리밍 애플리케이션의 코드 예제
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 업그레이드된 Spark Kinesis 커넥터 사용 시 고려 사항
<a name="migrating-spark-kinesis-considerations"></a>
+ 애플리케이션에서 JDK 버전 11 미만의 `Kinesis-producer-library`를 사용하는 경우 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`와 같은 예외가 발생할 수 있습니다. 이는 EMR 7.0이 기본적으로 JDK 17과 함께 제공되고 Java 11 이상부터 J2EE 모듈이 표준 라이브러리에서 제거되었기 때문에 발생합니다. 이 문제는 pom 파일에 다음 종속성을 추가하여 해결할 수 있습니다. 상황에 맞게 라이브러리 버전을 교체합니다.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Spark Kinesis 커넥터 jar는 EMR 클러스터가 생성된 후 `/usr/lib/spark/connector/lib/` 경로에서 찾을 수 있습니다.

# S3DistCp(s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

Apache DistCp는 대량의 데이터를 복사하는 데 사용할 수 있는 오픈 소스 도구입니다. *S3DistCp*는 DistCp와 비슷하지만 AWS, 특히 Amazon S3에서 함께 작동하도록 최적화되었습니다. Amazon EMR 버전 4.0 및 이후에서 S3DistCp 명령은 `s3-dist-cp`이며, 클러스터 단계로 혹은 명령줄에 추가됩니다. S3DistCp를 사용하여 Amazon S3의 대량 데이터를 HDFS로 효율적으로 복사할 수 있습니다. HDFS에서는 Amazon EMR 클러스터의 후속 단계를 통해 데이터를 처리할 수 있습니다. 또한 S3DistCp를 사용하여 Amazon S3 버킷 간에 또는 HDFS에서 Amazon S3로 데이터를 복사할 수도 있습니다. S3DistCp는 버킷과 AWS 계정 간에 많은 수의 객체를 병렬로 복사하는 데 더 확장 가능하고 효율적입니다.

실제 시나리오에서 S3DistCP의 유연성을 보여주는 특정 명령은 AWS 빅 데이터 블로그의 [ S3DistCp 사용에 대한 7가지 팁을](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/) 참조하세요.

DistCp처럼 S3DistCp도 MapReduce를 사용하여 분산 방식으로 복사합니다. 여러 서버 간에 복사, 오류 처리, 복구 및 보고 작업을 공유합니다. Apache DistCp 오픈 소스 프로젝트에 대한 자세한 내용은 Apache Hadoop 설명서에서 [DistCp guide](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html)를 참조하세요.

S3DistCp가 지정된 파일의 일부 또는 전체를 복사할 수 없는 경우 클러스터 단계가 실패하고 0 이외의 오류 코드를 반환합니다. 이러한 상황이 발생할 경우 S3DistCp는 부분적으로 복사된 파일을 지우지 않습니다.

**중요**  
S3DistCp는 밑줄 문자를 포함하는 Amazon S3 버킷 이름을 지원하지 않습니다.  
S3DistCp는 Parquet 파일의 연결을 지원하지 않습니다. 대신 PySpark를 사용하세요. 자세한 내용은 [Amazon EMR에서 parquet 파일 연결](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/)을 참조하세요.  
S3DistCP를 사용하여 단일 파일(디렉터리 대신)을 S3에서 HDFS로 복사할 때 복사 오류를 방지하려면 Amazon EMR 버전 5.33.0 이상 또는 Amazon EMR 버전 6.3.0 이상을 사용합니다.

## S3DistCp 옵션
<a name="UsingEMR_s3distcp.options"></a>

S3DistCp는 DistCp와 비슷하지만 데이터를 복사하고 압축하는 방법을 변경하는 다양한 옵션 세트를 지원합니다.

S3DistCp를 직접 호출하면 다음 테이블에서 설명하는 옵션을 지정할 수 있습니다. 이 옵션은 인수 목록을 사용하여 단계에 추가됩니다. 다음 표에는 S3DistCp 인수 예가 나와 있습니다.


| 옵션  | 설명  | 필수  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  복사할 데이터의 위치입니다. 이 위치는 HDFS 또는 Amazon S3 위치일 수 있습니다. 예시: `‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`   S3DistCp는 밑줄 문자를 포함하는 Amazon S3 버킷 이름을 지원하지 않습니다.   | 예  | 
| ‑‑dest=LOCATION  |  데이터의 대상입니다. 이 위치는 HDFS 또는 Amazon S3 위치일 수 있습니다. 예시: `‑‑dest=hdfs:///output`   S3DistCp는 밑줄 문자를 포함하는 Amazon S3 버킷 이름을 지원하지 않습니다.   | 예  | 
| ‑‑srcPattern=PATTERN  |  `‑‑src`에서 데이터의 하위 세트로 복사 작업을 필터링하는 [정규식](http://en.wikipedia.org/wiki/Regular_expression)입니다. `‑‑srcPattern`과 `‑‑groupBy`를 둘 다 지정하지 않으면 `‑‑src`의 모든 데이터가 `‑‑dest`로 복사됩니다. 정규식 인수에 별표(\$1) 같은 특수 문자가 포함되어 있는 경우 정규식 또는 전체 `‑‑args` 문자열을 작은따옴표(')에 포함해야 합니다. 예시: `‑‑srcPattern=.*daemons.*-hadoop-.*`   | 아니요  | 
| ‑‑groupBy=PATTERN  |  S3DistCp에서 식과 일치하는 파일을 연결하는 [정규식](http://en.wikipedia.org/wiki/Regular_expression)입니다. 예를 들면 이 옵션을 사용하여 한 시간 동안 작성된 모든 로그 파일을 단일 파일로 결합할 수 있습니다. 연결된 파일 이름은 그룹화 정규식과 일치하는 값입니다. 괄호는 파일을 그룹화하는 방식을 나타내며, 괄호 안에 지정된 문과 일치하는 모든 항목이 단일 출력 파일로 결합됩니다. 괄호에 지정된 문이 정규식에 포함되지 않는 경우 S3DistCp 단계에 대해 클러스터가 실패하며 오류가 발생합니다. 정규식 인수에 별표(\$1) 같은 특수 문자가 포함되어 있는 경우 정규식 또는 전체 `‑‑args` 문자열을 작은따옴표(')에 포함해야 합니다. `‑‑groupBy`를 지정하면 지정한 패턴과 일치하는 파일만 복사됩니다. `‑‑groupBy`와 `‑‑srcPattern`을 동시에 지정할 필요는 없습니다. 예시: `‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | 아니요  | 
| ‑‑targetSize=SIZE  |  `‑‑groupBy` 옵션에 따라 생성할 파일의 크기로, 단위는 메비바이트(MiB)입니다. 이 값은 정수여야 합니다. `‑‑targetSize`를 설정하는 경우 S3DistCp에서는 이 크기에 맞추려고 합니다. 복사한 파일의 실제 크기는 이 값보다 크거나 작을 수도 있습니다. 작업이 데이터 파일의 크기에 따라 집계되므로 대상 파일 크기가 소스 데이터 파일 크기와 일치할 수 있습니다. `‑‑groupBy`에 의해 연결된 파일이 `‑‑targetSize`의 값보다 크면 부분 파일로 나눠지고 이름 끝에 숫자 값이 순차적으로 지정됩니다. 예를 들어, `myfile.gz`에 연결된 파일은 `myfile0.gz`, `myfile1.gz` 등 여러 부분으로 나눠집니다. 예시: `‑‑targetSize=2`   | 아니요  | 
| ‑‑appendToLastFile |  이미 있는 파일을 Amazon S3에서 HDFS로 복사하는 경우 S3DistCp의 동작을 지정합니다. 이 옵션은 새 파일 데이터를 기존 파일에 추가합니다. `‑‑appendToLastFile`을 `‑‑groupBy`와 함께 사용할 경우 동일 그룹과 일치하는 파일에 새 데이터가 추가됩니다. 이 옵션은 `‑‑targetSize`와 함께 사용하는 경우 `‑‑groupBy.` 동작과 관련이 있습니다.  | 아니요  | 
| ‑‑outputCodec=CODEC  |  복사한 파일에 사용할 압축 코덱을 지정합니다. 이때 `gzip`, `gz`, `lzo`, `snappy` 또는 `none` 값을 사용할 수 있습니다. 예를 들어, 이 옵션을 사용하여 Gzip으로 압축된 입력 파일을 LZO로 압축된 출력 파일로 변환하거나 복사 작업의 일부로서 파일의 압축을 풀 수 있습니다. 출력 코덱을 선택하는 경우 파일 이름에 해당 확장명(예: `gz` 및 `gzip`의 경우, 확장명은 `.gz`)이 추가됩니다. `‑‑outputCodec`의 값을 지정하지 않으면 파일이 압축 변화 없이 복사됩니다. 예시: `‑‑outputCodec=lzo`   | 아니요  | 
| ‑‑s3ServerSideEncryption  |  대상 데이터가 SSL을 사용하여 전송되고 AWS 서비스 측 키를 사용하여 Amazon S3에서 자동으로 암호화되는지 확인합니다. S3DistCp를 사용하여 데이터를 검색할 때 객체가 자동으로 암호화 해제됩니다. 암호화되지 않은 객체를 암호화가 요구되는 Amazon S3 버킷으로 복사하려는 경우 작업이 실패합니다. 자세한 내용은 [데이터 암호화 사용](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html)을 참조하세요. 예시: `‑‑s3ServerSideEncryption`   | 아니요  | 
| ‑‑deleteOnSuccess  |  이 옵션을 지정한 경우 복사 작업이 성공적으로 수행되면 S3DistCp가 복사한 파일이 소스 위치에서 삭제됩니다. 이 옵션은 예약 작업으로서 로그 파일 같은 출력 파일을 한 위치에서 다른 위치로 복사하려 하며 동일한 파일을 두 번 복사하기를 원치 않는 경우에 유용합니다. 예시: `‑‑deleteOnSuccess`   | 아니요  | 
| ‑‑disableMultipartUpload  |  멀티파트 업로드 사용을 비활성화합니다. 예시: `‑‑disableMultipartUpload`   | 아니요  | 
| ‑‑multipartUploadChunkSize=SIZE  |  Amazon S3 멀티파트 업로드의 각 파트 크기(MiB). S3DistCp는 `multipartUploadChunkSize`보다 큰 데이터를 복사할 때 멀티파트 업로드를 사용합니다. 작업 성능을 향상시키기 위해 각 부품 크기를 늘릴 수 있습니다. 기본 크기는 128MiB입니다. 예시: `‑‑multipartUploadChunkSize=1000`   | 아니요  | 
| ‑‑numberFiles  |  출력 파일 앞에 순차 번호를 붙입니다. `‑‑startingIndex`에 다른 값을 지정하는 경우가 아니면 번호는 0부터 시작합니다. 예시: `‑‑numberFiles`   | 아니요  | 
| ‑‑startingIndex=INDEX  |  `‑‑numberFiles`와 함께 사용되어 순서에서 첫 번째 번호를 지정합니다. 예시: `‑‑startingIndex=1`   | 아니요  | 
| ‑‑outputManifest=FILENAME  |  S3DistCp에서 복사한 모든 파일의 목록을 포함하는 텍스트 파일(Gzip으로 압축됨)을 생성합니다. 예시: `‑‑outputManifest=manifest-1.gz`   | 아니요  | 
| ‑‑previousManifest=PATH  |  `‑‑outputManifest` 플래그를 사용하여 이전 S3DistCp 호출 중에 생성된 매니페스트 파일을 읽습니다. `‑‑previousManifest` 플래그를 설정한 경우 S3DistCp가 매니페스트에 나열된 파일을 복사 작업에서 제외시킵니다. `‑‑outputManifest`를 `‑‑previousManifest`와 함께 지정하는 경우 복사되지 않았더라도 이전 매니페스트에 나열된 파일이 새 매니페스트 파일에도 나타납니다. 예시: `‑‑previousManifest=/usr/bin/manifest-1.gz`   | 아니요  | 
| ‑‑requirePreviousManifest |  이전 S3DistCp 호출 중에 생성된 이전 매니페스트가 필요합니다. false로 설정된 경우 이전 매니페스트가 지정되지 않았을 때 오류가 발생하지 않습니다. 기본값은 true입니다.  | 아니요  | 
| ‑‑copyFromManifest  |  S3DistCp에서 파일 목록을 복사에서 제외하는 것이 아니라 복사할 파일 목록으로서 지정한 매니페스트 파일을 사용하도록 `‑‑previousManifest`의 동작을 반대로 설정합니다. 예시: `‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`   | 아니요  | 
| ‑‑s3Endpoint=ENDPOINT |  파일 업로드 시 사용할 Amazon S3 엔드포인트를 지정합니다. 이 옵션은 소스 및 대상 모두의 엔드포인트를 설정합니다. 설정하지 않은 경우 기본 엔드포인트는 `s3.amazonaws.com`입니다. Amazon S3 엔드포인트 목록은 [리전 및 엔드포인트](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region)를 참조하세요. 예시: `‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`   | 아니요  | 
| ‑‑storageClass=CLASS |  대상이 Amazon S3일 때 사용할 스토리지 클래스입니다. 유효한 값은 STANDARD 및 REDUCED\$1REDUNDANCY입니다. 이 옵션을 지정하지 않은 경우 S3DistCp에서 스토리지 클래스를 보존합니다. 예시: `‑‑storageClass=STANDARD`  | 아니요  | 
| ‑‑srcPrefixesFile=PATH |  Amazon S3(s3://), HDFS(hdfs:///) 또는 로컬 파일 시스템(file:/)에 있는 `src` 접두사 목록(줄당 접두사 한 개)을 포함하는 텍스트 파일입니다. `srcPrefixesFile`을 제공하는 경우 S3DistCp에서 src 경로를 나열하지 않습니다. 대신에 결합된 결과로서 소스 목록을 생성하여 이 파일에 지정된 모든 접두사를 나열합니다. 이러한 접두사를 사용하는 src 경로와는 달리, 상대 경로를 사용하여 대상 경로를 생성합니다. `srcPattern`도 지정한 경우 이 패턴이 소스 접두사의 결합된 목록 결과에 적용되어 입력을 추가로 필터링합니다. `copyFromManifest`를 사용하는 경우 매니페스트의 객체가 복사되고 `srcPrefixesFile`이 무시됩니다. 예시: `‑‑srcPrefixesFile=PATH`  | 아니요  | 

위의 옵션 외에도, S3DistCp는 [도구 인터페이스](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html)를 구현하므로 일반 옵션이 지원됩니다.

## S3DistCp를 클러스터에 단계로 추가
<a name="UsingEMR_s3distcp.step"></a>

클러스터에서 단계로 추가하여 S3DistCp를 호출할 수 있습니다. 콘솔, CLI 또는 API를 사용하여 시작 시 클러스터에 또는 실행 중인 클러스터에 단계를 추가할 수 있습니다. 다음 예제에서는 S3DistCp를 사용하여 실행 중인 클러스터에 추가하는 방법을 보여 줍니다. 클러스터에 단계를 추가하는 방법에 대한 자세한 내용은 *Amazon EMR 관리 안내서*에서 [클러스터에 작업 제출](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html)을 참조하세요.

**를 사용하여 실행 중인 클러스터에 S3DistCp 단계를 추가하려면 AWS CLI**

에서 Amazon EMR 명령을 사용하는 방법에 대한 자세한 AWS CLI내용은 [AWS CLI 명령 참조](https://docs.aws.amazon.com/cli/latest/reference/emr)를 참조하세요.
+ S3DistCp를 호출하는 클러스터에 단계를 추가하려면 S3DistCp에서 복사 작업을 수행하는 방식을 지정하는 파라미터를 인수로 전달합니다.

  다음 예제에서는 Amazon S3에서 `hdfs:///output`으로 대몬(daemon) 로그를 복사합니다. 다음 명령을 실행합니다.
  + `‑‑cluster-id`는 클러스터를 지정합니다.
  + `Jar`은 S3DistCp JAR 파일의 위치입니다. command-runner.jar을 사용하여 클러스터에서 명령을 실행하는 방법의 예제는 [사용자 지정 JAR 단계를 제출하여 스크립트 또는 명령 실행](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples)을 참조하세요.
  + `Args`는 S3DistCp로 전달할 옵션 이름-값 페어의 쉼표로 구분된 목록입니다. 사용 가능한 옵션의 전체 목록은 [S3DistCp 옵션](#UsingEMR_s3distcp.options) 섹션을 참조하세요.

  S3DistCp 복사 단계를 실행 중인 클러스터에 추가하려면 Amazon S3 또는 로컬 파일 시스템에 저장된 JSON 파일(이 예제의 경우 `myStep.json`)에 다음을 추가합니다. *j-3GYXXXXXX9IOK*를 클러스터 ID로 바꾸고 *amzn-s3-demo-bucket*을 Amazon S3 버킷 이름으로 바꿉니다.

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example Amazon S3에서 HDFS로 로그 파일 복사**  
이 예제에서는 실행 중인 클러스터에 단계를 추가하여 Amazon S3 버킷에 저장된 로그 파일을 HDFS로 복사하는 방법을 보여줍니다. 이 예제에서 `‑‑srcPattern` 옵션은 복사된 데이터를 대몬 로그로 제한하는 데 사용됩니다.  
`‑‑srcPattern` 옵션을 사용하여 Amazon S3에서 HDFS로 로그 파일을 복사하려면 Amazon S3 또는 로컬 파일 시스템에 저장된 JSON 파일(이 예제의 경우 `myStep.json`)에 다음을 추가합니다. *j-3GYXXXXXX9IOK*를 클러스터 ID로 바꾸고 *amzn-s3-demo-bucket*을 Amazon S3 버킷 이름으로 바꿉니다.  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## 실패한 S3DistCp 작업 후 정리
<a name="s3distcp-cleanup"></a>

S3DistCp가 지정된 파일의 일부 또는 전체를 복사할 수 없는 경우 명령 또는 클러스터 단계가 실패하고 0 이외의 오류 코드를 반환합니다. 이러한 상황이 발생할 경우 S3DistCp는 부분적으로 복사된 파일을 지우지 않습니다. 수동으로 삭제해야 합니다.

부분적으로 복사된 파일은 S3DistCP 작업의 고유 식별자를 사용하여 하위 디렉터리의 HDFS `tmp` 디렉터리에 저장됩니다. 이 ID는 작업의 표준 출력에서 찾을 수 있습니다.

예를 들어, ID가 `4b1c37bb-91af-4391-aaf8-46a6067085a6`인 S3DistCP 작업의 경우 클러스터의 마스터 노드에 연결하고 다음 명령을 실행하여 작업과 연관된 출력 파일을 볼 수 있습니다.

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

이 명령은 다음과 유사한 파일 목록을 반환합니다.

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

그 후에 다음 명령을 실행하여 디렉터리와 모든 내용을 삭제할 수 있습니다.

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```