

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 連接器與公用程式
<a name="emr-connectors"></a>

Amazon EMR 提供多種連接器和公用程式，以存取其他 AWS 服務做為資料來源。您通常可以在某個程式裡存取多項服務的資料。例如，您可以在 Hive 查詢、Pig 指令碼或 MapReduce 應用程式內指定某個 Kinesis 串流，然後對該資料執行操作。

**Topics**
+ [使用 Amazon EMR 在 DynamoDB 中匯出、匯入、查詢和聯結資料表](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [S3DistCp (s3-dist-cp)](UsingEMR_s3distcp.md)
+ [S3DistCp 作業失敗之後清除](#s3distcp-cleanup)

# 使用 Amazon EMR 在 DynamoDB 中匯出、匯入、查詢和聯結資料表
<a name="EMRforDynamoDB"></a>

**注意**  
Amazon EMR-DynamoDB 連接器是 GitHub 上的開放原始碼。如需詳細資訊，請參閱 [https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector)。

DynamoDB 是全受管 NoSQL 資料庫服務，提供快速且可預期的效能，以及無縫的可擴展性。開發人員可建立資料庫資料表，不受限制地增加請求流量或儲存空間。DynamoDB 會自動將資料表的資料與傳輸流分散到足夠數量的伺服器上，以因應客戶所指定的請求處理容量和儲存的資料量，同時保持快速、一致的效能。透過 Amazon EMR 和 Hive，您可以迅速且有效地處理大量資料，例如儲存在 DynamoDB 中的資料。如需有關 DynamoDB 的詳細資訊，請參閱 [Amazon DynamoDB 開發人員指南](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)。

Apache Hive 為軟體層的一種，可使用 HiveQL 這種類似 SQL 的簡化查詢語言來對 Map Reduce 叢集進行查詢。其是在 Hadoop 架構上執行。如需有關 Hive 與 HiveQL 的詳細資訊，請參閱 [HiveQL 語言手冊](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)。如需有關 Hive 與 Amazon EMR 的詳細資訊，請參閱 [Apache Hive](emr-hive.md)。

您可以將 Amazon EMR 與具有 DynamoDB 連線能力的 Hive 自訂版本搭配使用，對儲存在 DynamoDB 中的資料執行下列操作：
+ 將 DynamoDB 資料載入 Hadoop 分散式檔案系統 (HDFS)，並將其作為 Amazon EMR 叢集的輸入。
+ 使用 SQL 式的陳述式 (HiveQL) 查詢即時 DynamoDB 資料。
+ 聯結儲存在 DynamoDB 內的資料並將之匯出，或對已聯結的資料進行查詢。
+ 將儲存在 DynamoDB 中的資料匯出至 Amazon S3。
+ 將儲存在 Amazon S3 中的資料匯入至 DynamoDB。

**注意**  
Amazon EMR-DynamoDB 連接器不支援設定為使用 [Kerberos 身分驗證](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html)的叢集。

若要執行以下各項任務，您必須啟動 Amazon EMR 叢集並指定資料在 DynamoDB 中的位置，然後發出 Hive 命令，藉此控制 DynamoDB 內的資料。

有多種方式可以啟動 Amazon EMR 叢集：您可以使用 Amazon EMR 主控台、命令列界面 (CLI)，也可以使用 AWS SDK 或 Amazon EMR API 來編寫叢集的程式。您也可以選擇要以互動方式執行 Hive 叢集，或是以指令碼執行。在本章節中，我們會示範如何從 Amazon EMR 主控台和 CLI 啟動互動式的 Hive 叢集。

以互動方式使用 Hive 相當適合用於查詢效能的測試和應用程式的微調工作。建立好一組會定期執行的 Hive 命令後，可以考慮建立能交由 Amazon EMR 執行的 Hive 指令碼。

**警告**  
Amazon EMR 在 DynamoDB 資料表上的讀取或寫入操作會影響佈建的輸送量，這有可能會導致佈建輸送量出現例外狀況的頻率上升。處理大量請求時，Amazon EMR 會採取指數退避的策略進行重試，以管理 DynamoDB 資料表上的請求負載。與其他流量同時執行 Amazon EMR 作業，可能會導致超出所分配的佈建的輸送量。您可以在 Amazon CloudWatch 中檢查 **ThrottleRequests (ThrottleRequests)** 指標，藉此監控輸送量。在請求負載過高的情況下，您可以重新啟動叢集，並將 [讀取百分比設定](EMR_Hive_Optimizing.md#ReadPercent) 或 [寫入百分比設定](EMR_Hive_Optimizing.md#WritePercent) 設定為較低的值，進而調節 Amazon EMR 操作。如需有關 DynamoDB 輸送量設定的資訊，請參閱[佈建輸送量](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput)。  
如果資料表設定為[隨需模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand)，您應先將資料表變更回佈建模式，然後再執行匯出或匯入操作。管道需要輸送量比率，才能計算 DynamoDBtable 中要使用的資源。隨需模式會移除佈建輸送量。若要佈建輸送容量，您可以使用 Amazon CloudWatch Events 指標來評估資料表已使用的彙總輸送量。

**Topics**
+ [設定 Hive 資料表以執行 Hive 命令](EMR_Interactive_Hive.md)
+ [在 DynamoDB 中匯出、匯入和查詢資料的 Hive 命令範例](EMR_Hive_Commands.md)
+ [優化 DynamoDB 中的 Amazon EMR 操作效能](EMR_Hive_Optimizing.md)

# 設定 Hive 資料表以執行 Hive 命令
<a name="EMR_Interactive_Hive"></a>

Apache Hive 是一種資料倉儲應用程式，可使用類似 SQL 的語言來查詢 Amazon EMR 叢集中包含的資料。如需關於 Hive 的詳細資訊，請參閱 [http://hive.apache.org/](http://hive.apache.org/)。

下列程序假設您已建立一個叢集並指定了一組 Amazon EC2 金鑰對。若要了解如何開始建立叢集，請參閱《Amazon EMR 管理指南》**中的 [Amazon EMR 入門](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs)。

## 設定 Hive 以使用 MapReduce
<a name="hive-mapreduce"></a>

在 Amazon EMR 上使用 Hive 查詢 DynamoDB 資料表時，如果 Hive 使用預設執行引擎 Tez，可能會發生錯誤。因此，當您透過 Hive 建立可與 DynamoDB 整合的叢集時 (如本章節所述)，建議採用可設定 Hive 的組態分類，以妥善使用 MapReduce。如需詳細資訊，請參閱[設定應用程式](emr-configure-apps.md)。

下列程式碼片段會顯示將 MapReduce 設定成 Hive 適用之執行引擎時，所使用的組態分類和屬性。

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**以互動方式執行 Hive 命令**

1. 連接至主節點。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[使用 SSH 連接至主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 目前的主節點出現命令提示時，請輸入 `hive`。

   請查看 Hive 提示：`hive>`

1.  輸入 Hive 命令，將 Hive 應用程式中的資料表映射至 DynamoDB 中的資料。此資料表的用途等同於儲存在 Amazon DynamoDB 中的資料的參考；資料並非本機儲存在 Hive 內，而任何使用此資料表的查詢在執行時均會依據 DynamoDB 內的即時資料，每次執行命令時皆會耗用資料表的讀取或寫入容量。若希望對相同的資料集執行多個 Hive 命令，請考慮先將之匯出。

    將 Hive 資料表映射至 DynamoDB 資料表的語法如下所示。

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    在 DynamoDB 的 Hive 內建立資料表時，請務必使用關鍵字 `EXTERNAL` 將其建立為外部資料表。外部資料表與內部資料表的差異在於，捨棄內部資料表時，內部資料表內的資料會隨之刪除。在連結至 Amazon DynamoDB 時，不會希望出現此種行為，因此僅支援外部資料表。

    例如，下列 Hive 命令會在 Hive 內建立一個名為 *hivetable1* 的資料表，其所參考的是名為 *dynamodbtable1* 的 DynamoDB 資料表。DynamoDB 資料表 *dynamodbtable1* 具有雜湊與範圍主索引鍵結構描述。雜湊金鑰元素為 `name` (字串類型)，範圍金鑰元素為 `year` (數字類型)，而各項均有 `holidays` 的屬性值 (字串集類型)。

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    行 1 使用了 HiveQL `CREATE EXTERNAL TABLE` 陳述式。對於 *hivetable1*，您需要在 DynamoDB 資料表內為每個屬性名稱/值對建立資料欄，並提供資料類型。這些值並不區分大小寫，且可自由選擇欄位名稱 (保留字除外)。

    行 2 使用了 `STORED BY` 陳述式。`STORED BY` 的值為負責處理 Hive 與 DynamoDB 之間連線的類別名稱，應設為 `'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'`。

    行 3 則是使用 `TBLPROPERTIES` 陳述式，可將 "hivetable1" 與 DynamoDB 內的正確資料表和結構描述關聯。請將 `TBLPROPERTIES` 參數和 `dynamodb.table.name` 參數的值提供給 `dynamodb.column.mapping`。這些值*會*區分大小寫。
**注意**  
 資料表的所有 DynamoDB 屬性名稱都必須在 Hive 資料表中具有對應的資料欄。取決於您的 Amazon EMR 版本而定，如果一對一映射不存在，則會發生下列情況：  
在 Amazon EMR 5.27.0 版及更新版本上，連接器具有驗證，可確保 DynamoDB 屬性名稱和 Hive 資料表中的資料欄之間具有一對一映射。如果一對一映射不存在，則會發生錯誤。
在 Amazon EMR 5.26.0 版及較早版本上，Hive 資料表不會包含來自 DynamoDB 的名稱值對。如果未映射 DynamoDB 的主索引鍵屬性，Hive 會發生錯誤。若並未映射非主要的金鑰屬性，就不會產生錯誤，但也無法在 Hive 資料表內看見資料。若資料類型不符合，值會為 null。

接著，您就能開始在 *hivetable1* 上執行 Hive 操作。針對 *hivetable1* 執行的查詢會於內部根據您 DynamoDB 帳戶的 DynamoDB 資料表 *dynamodbtable1* 執行，每次執行都會耗用讀取或寫入的單位。

在針對 DynamoDB 資料表執行 Hive 查詢時，需要確保已先佈建好充足的讀取容量單位。

例如，假設您已為 DynamoDB 資料表佈建 100 單位的讀取容量。這會讓您每秒執行 100 次讀取或是 409,600 位元組。若資料表含有 20 GB 的資料 (21,474,836,480 位元組)，而您的 Hive 查詢要執行一次完整的資料表掃描，就可以估算查詢所需的執行時間：

 「21,474,836,480 / 409,600 = 52,429 秒 = 14.56 小時」** 

減少所需時間的唯一方法，便是調整來源 DynamoDB 資料表的讀取容量單位。增加更多 Amazon EMR 節點並無效果。

在 Hive 輸出中，當一或多個映射器程序結束後，即會更新完成的百分比。若為大型 DynamoDB 資料表，而採用低佈建讀取容量設定，則輸出完成的百分比可能在相當長一段時間內不會更新；在上述情況下，任務會有好幾個小時皆顯示為 0% 完成。如需作業進度詳細的狀態資訊，請移至 Amazon EMR 主控台，即可檢視個別映射器的作業狀態，以及資料讀取的統計數據。您也可在主節點上登入 Hadoop 界面並查看 Hadoop 的統計數據。其會顯示個別的映射任務狀態，以及一切資料讀取的統計數據。如需詳細資訊，請參閱下列主題：
+ [主節點上託管的 Web 介面](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [檢視 Hadoop Web 介面](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

如需有關 HiveQL 陳述式範例的詳細資訊，以了解從 DynamoDB 匯出或匯入資料和聯結資料表等任務的執行方法，請參閱 [在 DynamoDB 中匯出、匯入和查詢資料的 Hive 命令範例](EMR_Hive_Commands.md)。<a name="EMR_Hive_Cancel"></a>

**取消 Hive 請求**

執行 Hive 查詢時，來自伺服器的初始回應會含有取消請求的命令。若要在程序的任一時間點取消請求，請在伺服器回應內使用 **Kill Command (Kill 命令)**。

1. 輸入 `Ctrl+C` 離開命令列用戶端。

1.  出現 Shell 提示時，您必須在請求的初始伺服器回應中輸入 **Kill Command (Kill 命令)**。

    或者，您也可以在主節點的命令列中執行下列命令，以終止 Hadoop 任務。Hadoop 任務的識別碼為 *job-id*，其可擷取自 Hadoop 使用者界面。

   ```
   hadoop job -kill job-id
   ```

## Hive 與 DynamoDB 的資料類型
<a name="EMR_Hive_Properties"></a>

下表顯示可用的 Hive 資料類型、其對應至的預設 DynamoDB 類型，以及它們也可以映射至的替代 DynamoDB 類型。


| Hive 類型 | 預設 DynamoDB 類型 | 替代 DynamoDB 類型 | 
| --- | --- | --- | 
| string | string (S) |  | 
| bigint 或 double | number (N) |  | 
| binary | binary (B) |  | 
| boolean | boolean (BOOL) |  | 
| 陣列 | list (L) | number set (NS)、string set (SS) 或 binary set (BS) | 
| map<string,string> | 項目 | map (M) | 
| map<string,?> | map (M) |  | 
|  | null (NULL) |  | 

如果您希望將 Hive 資料寫入為對應的替代 DynamoDB 類型，或如果您的 DynamoDB 資料包含替代 DynamoDB 類型的屬性值，則您可以使用 `dynamodb.type.mapping` 參數指定資料欄和 DynamoDB 類型。下列範例顯示指定替代類型映射的語法。

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

類型映射參數是選用的，而且只需要針對使用替代類型的資料欄指定。

例如，下列 Hive 命令會建立名為 `hivetable2` 的資料表，其會參考 DynamoDB 資料表 `dynamodbtable2`。它類似 `hivetable1`，只是它會將 `col3` 資料欄映射至字串集 (SS) 類型。

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

在 Hive 中，`hivetable1` 和 `hivetable2` 是相同的。不過，當這些資料表中的資料寫入至其對應的 DynamoDB 資料表時，`dynamodbtable1` 將包含清單，而 `dynamodbtable2` 將包含字串集。

如果您想要將 Hive `null` 值寫入為 DynamoDB `null` 類型的屬性，則可以使用 `dynamodb.null.serialization` 參數來執行此操作。下列範例顯示指定 `null` 序列化的語法。

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

null 序列化參數是選用的，若未指定，則會設為 `false`。請注意，無論參數設定為何，Hive 中的 DynamoDB `null` 屬性都會讀取為 `null` 值。只有在 null 序列化參數指定為 `true` 時，具有 `null` 值的 Hive 集合才能寫入 DynamoDB 中。否則，會發生 Hive 錯誤。

Hive 中的 bigint 類型與 Java long 類型相同，且 Hive double 類型在精準度上也等同於 Java double 類型。換言之，如果您在 DynamoDB 中儲存的數值資料精確度高於 Hive 所提供的資料類型，則使用 Hive 匯出、匯入或參考 DynamoDB 資料皆會導致精確度下降，或 Hive 查詢失敗。

 當您將 DynamoDB 的二進位類型匯出至 Amazon Simple Storage Service (Amazon S3) 或 HDFS 時，系統會將其儲存為 Base64 編碼字串。如果要將 Amazon S3 或 HDFS 的資料匯入 DynamoDB 二進位類型，則需確保該資料已編碼為 Base64 字串。

## Hive 選項
<a name="EMR_Hive_Options"></a>

 您可以設定以下 Hive 選項，以管理 Amazon DynamoDB 向外的資料傳輸。這些選僅持續用於目前的 Hive 工作階段。如果將 Hive 命令提示關閉，稍後再於叢集重新開啟，這些設定均會還原為預設值。


| Hive 選項 | Description | 
| --- | --- | 
| dynamodb.throughput.read.percent |   設定讀取操作的比率，以將 DynamoDB 佈建的輸送量比率維持在您資料表分配到的範圍內。該值介於 `0.1` 和 `1.5` (含) 之間。  0.5 的值為預設讀取率，表示 Hive 會嘗試使用資料表內佈建的輸送量讀取資源的一半。將此值提高到 0.5 以上即會提高讀取要求率。將值降到 0.5 以下就會降低讀取要求率。此讀取率為近似值。根據不同因素，例如在 DynamoDB 資料表中是否有統一金鑰分佈等，實際讀取率可能會有所不同。  若您發現 Hive 操作常會超出佈建的輸送量，或者即時讀取流量受到多次調節，請將此值減少到低於 `0.5`。若您擁有充足的容量，並希望 Hive 操作的速度更快，請將該值設為 `0.5` 以上。若您認為有未使用的輸入/輸出操作可供使用，您也可以設到 1.5 以上以達成超額訂閱。  | 
| dynamodb.throughput.write.percent |   設定寫入操作的比率，以將 DynamoDB 佈建的輸送量比率維持在您資料表分配到的範圍內。該值介於 `0.1` 和 `1.5` (含) 之間。  0.5 的值為預設寫入率，表示 Hive 會嘗試使用資料表內佈建的輸送量寫入資源的一半。將此值提高到 0.5 以上即會提高寫入要求率。將值降到 0.5 以下就會降低寫入要求率。寫入率為近似值。根據不同因素，例如在 DynamoDB 資料表中是否有統一金鑰分佈等，實際寫入率可能會有所不同   若您發現 Hive 操作常會超出佈建的輸送量，或者即時寫入流量受到多次調節，請將此值減少到低於 `0.5`。若您擁有充足的容量，並希望 Hive 操作的速度更快，請將該值設為 `0.5` 以上。若您認為有未使用的輸入/輸出操作可供使用，或是此為初次上傳資料到資料表上而尚無任何即時流量，您也可以設到 1.5 以上，以達成超額訂閱。  | 
| dynamodb.endpoint | 指定 DynamoDB 服務的端點。如需有關可用 DynamoDB 端點的詳細資訊，請參閱[區域與端點](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region)。  | 
| dynamodb.max.map.tasks |   指定從 DynamoDB 讀取資料時映射任務的最大數量。此數值必須等於或大於 1。  | 
| dynamodb.retry.duration |   指定做為逾時期間的分鐘數，以供重試 Hive 命令。該值必須為大於或等於 0 的整數。預設逾時期間為兩分鐘。  | 

 以下範例中顯示了如何使用 `SET` 命令設定這類選項。

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# 在 DynamoDB 中匯出、匯入和查詢資料的 Hive 命令範例
<a name="EMR_Hive_Commands"></a>

下列範例會使用 Hive 命令來執行各種操作，例如將資料匯出至 Amazon S3 或 HDFS、將資料匯入至 DynamoDB、聯結資料表、查詢資料表等。

Hive 資料表上的操作會參考儲存在 DynamoDB 中的資料。Hive 命令需遵守 DynamoDB 資料表的佈建輸送量設定，而其擷取的資料將包含 DynamoDB 在處理 Hive 操作請求時寫入 DynamoDB 資料表的資料。如果資料擷取程序花費的時間較長，Hive 命令傳回的部分資料可能會在 Hive 命令開始後又已於 DynamoDB 內更新。

Hive 命令 `DROP TABLE` 和 `CREATE TABLE` 僅對 Hive 中的本機資料表有效，因此無法在 DynamoDB 中建立或捨棄資料表。如果您的 Hive 查詢參考的是 DynamoDB 內的資料表，該資料表在您執行查詢前必須已經存在。如需有關在 DynamoDB 中建立和刪除資料表的詳細資訊，請參閱《Amazon DynamoDB 開發人員指南》**中的[使用 DynamoDB 中的資料表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html)。

**注意**  
 當您將 Hive 資料表映射至 Amazon S3 中的位置時，請勿將其映射至儲存貯體 s3：//amzn-s3-demo-bucket 的根路徑，因為這可能會在 Hive 將資料寫入 Amazon S3 時導致錯誤。而是將資料表對應至儲存貯體的子路徑，s3：//amzn-s3-demo-bucket/mypath。

## 從 DynamoDB 匯出資料
<a name="EMR_Hive_Commands_exporting"></a>

 您可以使用 Hive 從 DynamoDB 匯出資料。

**將 DynamoDB 資料表匯出至 Amazon S3 儲存貯體**
+  請建立用於參考儲存在 DynamoDB 中的資料的 Hive 資料表。接下來可以呼叫 INSERT OVERWRITE 命令，將資料寫入至外部目錄。在下列範例中，*s3：//amzn-s3-demo-bucket/path/subpath/* 是 Amazon S3 中的有效路徑。請調整 CREATE 命令中的資料欄和資料類型，以符合您 DynamoDB 中的值。您可以使用此方法在 Amazon S3 中建立 DynamoDB 資料的存檔。

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**使用格式化將 DynamoDB 資料表匯出至 Amazon S3 儲存貯體**
+  建立用於參考 Amazon S3 中的位置的外部資料表。於下方顯示為 s3\$1export。在呼叫 CREATE 期間，請指定資料表列的格式編排。然後，當您使用 INSERT OVERWRITE 從 DynamoDB 匯出資料至 s3\$1export 時，資料就會以指定的格式寫出。在以下範例中，資料是以逗號分隔值 (CSV) 的格式寫出。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**將 DynamoDB 資料表匯出至 Amazon S3 儲存貯體而不指定資料欄映射**
+  請建立用於參考儲存在 DynamoDB 中的資料的 Hive 資料表。與前述的範例類似，只是不用指定欄位的映射。此資料表必須僅有 `map<string, string>` 類型的一欄。如果您隨後在 Amazon S3 中建立 `EXTERNAL` 資料表，即可呼叫 `INSERT OVERWRITE` 命令，將資料從 DynamoDB 寫入至 Amazon S3。您可以使用此方法在 Amazon S3 中建立 DynamoDB 資料的存檔。由於並未映射欄位，您無法查詢以此方式匯出的資料表。Hive 0.8.1.5 或更新版本能夠不指定資料欄映射即匯出資料，而 Amazon EMR AMI 2.2.*x* 及更新版本也可支援此功能。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**使用資料壓縮將 DynamoDB 資料表匯出至 Amazon S3 儲存貯體**
+  Hive 提供了多種壓縮轉碼器，您可以在 Hive 工作階段期間設定。如此會將匯出的資料以指定格式壓縮。以下範例會使用 Lempel-Ziv-Oberhumer (LZO) 演算法壓縮匯出的檔案。

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   可用的壓縮轉碼器為：
  +  org.apache.hadoop.io.compress.GzipCodec 
  +  org.apache.hadoop.io.compress.DefaultCodec 
  +  com.hadoop.compression.lzo.LzoCodec 
  +  com.hadoop.compression.lzo.LzopCodec 
  +  org.apache.hadoop.io.compress.BZip2Codec 
  +  org.apache.hadoop.io.compress.SnappyCodec 

**將 DynamoDB 資料表匯出至 HDFS**
+  請使用下列 Hive 命令。其中 *hdfs:///directoryName* 是有效的 HDFS 路徑，*hiveTableName* 則是 Hive 中參考 DynamoDB 的資料表。Hive 0.7.1.1 在將資料匯出至 Amazon S3 時，會採用 HDFS 做為中間步驟，因此這個匯出操作的速度比匯出 DynamoDB 資料表至 Amazon S3 更快速。以下範例也說明了如何將 `dynamodb.throughput.read.percent` 設為 1.0，以提高讀取請求率。

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   您也可以使用前述匯出至 Amazon S3 的格式編排和壓縮等方式，將資料匯出至 HDFS。若要如此，只需將上述範例中的 Amazon S3 目錄變更為 HDFS 目錄即可。<a name="EMR_Hive_non-printable-utf8"></a>

**在 Hive 中讀取不可列印的 UTF-8 字元資料**
+ 您可以在建資料表時使用 `STORED AS SEQUENCEFILE` 子句，以 Hive 讀取和寫入不可列印的 UTF-8 字元資料。SequenceFile 為 Hadoop 二進位檔案格式，您需要使用 Hadoop 來讀取此檔。下列範例顯示如何將資料從 DynamoDB 匯出到 Amazon S3。您可以使用此功能處理不可列印的 UTF-8 編碼字元。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## 將資料匯入 DynamoDB
<a name="EMR_Hive_Commands_importing"></a>

 在使用 Hive 將資料寫入 DynamoDB 時，您應確保寫入容量單位數大於叢集中的映射器數量。例如，在 m1.xlarge EC2 執行個體上執行的叢集會在每個執行個體上產生 8 個映射器。而含有 10 個執行個體的叢集，則代表總共會有 80 個映射器。如果您寫入容量單位未超過叢集中的映射器數量，Hive 寫入操作可能會耗用所有寫入輸送量，或企圖使用比所佈建的量更多的輸送量。如需各 EC2 執行個體類型產生的映射器數量的詳細資訊，請參閱 [設定 Hadoop](emr-hadoop-config.md)。

 Hadoop 中的映射器數量會受到輸入分割的控制。若輸入分割的數量過少，則您的寫入命令可能會無法使用資料表中所有可用的寫入輸送量。

 如果目標 DynamoDB 資料表中已存在具有相同金鑰的項目，則會覆寫此項目。如果目標 DynamoDB 資料表中不存在具有金鑰的項目，則會插入此項目。

**將資料表從 Amazon S3 匯入 DynamoDB**
+  您可以使用 Amazon EMR 和 Hive 將資料從 Amazon S3 寫入至 DynamoDB。

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**將資料表從 Amazon S3 儲存貯體匯入 DynamoDB 而不指定資料欄映射**
+  請建立一個 `EXTERNAL` 資料表，用於參考先前從 DynamoDB 匯出並儲存在 Amazon S3 中的資料。在開始匯入前，確保 DynamoDB 中已有該資料表，且其具備的索引鍵結構描述與先前匯出的 DynamoDB 資料表相同。此外，該資料表僅能有 `map<string, string>` 類型的一欄。如果您隨後建立了連結到 DynamoDB 的 Hive 資料表，即可呼叫 `INSERT OVERWRITE` 命令，將資料從 Amazon S3 寫入至 DynamoDB。由於並未映射欄位，您無法查詢以此方式匯入的資料表。Hive 0.8.1.5 或更新版本能夠不指定資料欄映射即匯入資料，Amazon EMR AMI 2.2.3 和更新版本可支援。

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**將資料表從 HDFS 匯入 DynamoDB**
+  您可以使用 Amazon EMR 和 Hive，以將資料從 HDFS 寫入至 DynamoDB。

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## 查詢 DynamoDB 中的資料
<a name="EMR_Hive_Commands_querying"></a>

 下列範例會顯示數種方式，以供您使用 Amazon EMR 查詢儲存在 DynamoDB 中的資料。

**尋找映射欄 (`max`) 中的最大值**
+  請使用 Hive 命令，如下所示。在第一個命令中，CREATE 陳述式建立了一個參考儲存在 DynamoDB 內資料的 Hive 資料表。SELECT 陳述式接下來會使用該資料表查詢儲存在 DynamoDB 內的資料。以下範例會尋找指定客戶所下的最大訂單。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**使用 `GROUP BY` 子句彙總資料**
+  您可以使用 `GROUP BY` 子句跨多筆記錄收集資料。這通常會用於 sum、count、min 或 max 等彙總函數。下方範例會以購買三次以上的客戶為主，傳回最大訂單清單。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**聯結兩個 DynamoDB 資料表**
+  下列範例會將兩個 Hive 資料表映射至儲存在 DynamoDB 中的資料。然後會呼叫這兩個資料表的聯結。聯結會在叢集上運算並傳回。聯結不會在 DynamoDB 中發生。此範例會傳回下訂兩筆以上訂單的客戶和其購買項目的清單。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**聯節來自不同來源的兩個資料表**
+  在下列範例中，Customer\$1S3 是 Hive 資料表，其可載入儲存在 Amazon S3 中的 CSV 檔案；hive\$1purchases 則是用於參考 DynamoDB 中的資料的資料表。下列範例會將 Amazon S3 中儲存為 CSV 檔案的客戶資料與 DynamoDB 中儲存的訂單資料互相聯結，然後傳回一組訂單資料，其為姓名中有 "Miller" 一字的客戶所下訂。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**注意**  
 在上述每個範例中，均加入了 CREATE TABLE 陳述式，以求清晰明瞭和完整性。針對指定的 Hive 資料表執行多向查詢或匯出操作時，僅需在 Hive 工作階段開始時建立一次資料表。

# 優化 DynamoDB 中的 Amazon EMR 操作效能
<a name="EMR_Hive_Optimizing"></a>

 DynamoDB 資料表上的 Amazon EMR 操作算作讀取操作，會受到資料表佈建輸送量設定的限制。Amazon EMR 會採用自己的邏輯來試圖平衡 DynamoDB 資料表上的負載，盡可能降低超出佈建輸送量的可能性。在每一次 Hive 查詢的最後，Amazon EMR 都會用於處理查詢的叢集資訊，包含超出佈建輸送量的次數。您能夠運用此資訊並參考與 DynamoDB 輸送量相關的 CloudWatch 指標，藉此在後續請求中妥善管理 DynamoDB 資料表上的負載。

 下列因素會在搭配 DynamoDB 資料表使用時，影響 Hive 查詢的效能。

## 佈建的讀取容量單位
<a name="ProvisionedReadCapacityUnits"></a>

 在針對 DynamoDB 資料表執行 Hive 查詢時，需要確保已先佈建好充足的讀取容量單位。

 例如，假設您已為 DynamoDB 資料表佈建 100 單位的讀取容量。這會讓您每秒執行 100 次讀取或是 409,600 位元組。若資料表含有 20 GB 的資料 (21,474,836,480 位元組)，而您的 Hive 查詢要執行一次完整的資料表掃描，就可以估算查詢所需的執行時間：

 「21,474,836,480 / 409,600 = 52,429 秒 = 14.56 小時」** 

 減少所需時間的唯一方法，便是調整來源 DynamoDB 資料表的讀取容量單位。將更多節點新增至 Amazon EMR 叢集並沒有效果。

 在 Hive 輸出中，當一或多個映射器程序結束後，即會更新完成的百分比。若為大型 DynamoDB 資料表，而採用低佈建讀取容量設定，則輸出完成的百分比可能在相當長一段時間內不會更新；在上述情況下，任務會有好幾個小時皆顯示為 0% 完成。如需作業進度詳細的狀態資訊，請移至 Amazon EMR 主控台，即可檢視個別映射器的作業狀態，以及資料讀取的統計數據。

 您也可在主節點上登入 Hadoop 界面並查看 Hadoop 的統計數據。此會顯示您個別映射任務的狀態，以及一些資料讀取的統計數據。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[主節點上託管的 Web 介面](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)。

## 讀取百分比設定
<a name="ReadPercent"></a>

 依預設，Amazon EMR 會根據目前佈建的輸送量來管理 DynamoDB 資料表的請求負載。然而，當 Amazon EMR 傳回的作業資訊含有大量超出佈建輸送量限制的回應時，您可以在設定 Hive 資料表期間利用 `dynamodb.throughput.read.percent` 參數調整預設讀取率。如需設定讀取百分比參數的詳細資訊，請參閱 [Hive 選項](EMR_Interactive_Hive.md#EMR_Hive_Options)。

## 寫入百分比設定
<a name="WritePercent"></a>

 依預設，Amazon EMR 會根據目前佈建的輸送量來管理 DynamoDB 資料表的請求負載。不過，當 Amazon EMR 傳回的作業資訊含有大量超出佈建輸送量限制的回應時，您可以在設定 Hive 資料表期間利用 `dynamodb.throughput.write.percent` 參數調整預設寫入率。如需有關設定寫入百分比參數的詳細資訊，請參閱 [Hive 選項](EMR_Interactive_Hive.md#EMR_Hive_Options)。

## 重試期間設定
<a name="emr-ddb-retry-duration"></a>

 依預設，如果 Amazon EMR 在預設的兩分鐘重試間隔內未傳回結果，即會重新執行 Hive 查詢。您可以在執行 Hive 查詢時，藉由修改 `dynamodb.retry.duration` 參數，調整此間隔。如需有關設定寫入百分比參數的詳細資訊，請參閱 [Hive 選項](EMR_Interactive_Hive.md#EMR_Hive_Options)。

## 映射任務數量
<a name="NumberMapTasks"></a>

 Hadoop 為了處理您匯出和查詢儲存在 DynamoDB 內資料的請求所啟動的映射器精靈，其上限為每秒 1 MiB 的最大讀取速率，以限制使用的讀取容量。如果在 DynamoDB 上有其他可用的佈建輸送量，您可以提高映射器精靈的數量，藉此改善 Hive 匯出和查詢操作的效能。若要如此，您可以增加叢集中 EC2 執行個體的數量，*或*增加在各個 EC2 執行個體上執行的映射器常駐程式的數量。

 您可以停止目前叢集，再以更多的 EC2 執行個體數目重新啟動，即可增加叢集中 EC2 執行個體的數量。如果您是於 Amazon EMR 主控台啟動叢集，則於**設定 EC2 執行個體**對話方塊指定 EC2 執行個體的數量；若是從 CLI 啟動叢集，則以選項 `‑‑num-instances` 進行。

 在執行個體上執行的映射任務數量視 EC2 執行個體類型的差異而定。如需支援的 EC2 執行個體類型和各類型提供的映射器數量的詳細資訊，請參閱 [任務組態](emr-hadoop-task-config.md)。可在此找到各支援設定的「任務設定」一節。

 另一種增加映射器精靈數目的方式，便是將 Hadoop 的 `mapreduce.tasktracker.map.tasks.maximum` 組態參數變更為更大的值。這種作法的優點在於，可提供更多映射器，但不會增加 EC2 執行個體的數量或大小，更為省錢。缺點則是，若您將數值設得太高，叢集中的 EC2 執行個體讀記憶體可能不足。若要設定 `mapreduce.tasktracker.map.tasks.maximum`，請啟動叢集，並指定將 `mapreduce.tasktracker.map.tasks.maximum` 的值指定為 mapred-site 組態分類的屬性。如以下範例所示。如需詳細資訊，請參閱[設定應用程式](emr-configure-apps.md)。

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## 平行資料請求
<a name="ParallelDataRequests"></a>

 針對單一資料表的多個資料請求，無論是來自一名以上的使用者還是一個以上的應用程式，都會耗盡讀取佈建輸送量，降低效能。

## 處理持續時間
<a name="ProcessDuration"></a>

 DynamoDB 中的資料一致性取決於每個節點讀取和寫入操作的順序。當 Hive 查詢正在進行中時，另一個應用程式可以將新資料載入 DynamoDB 資料表，或修改或刪除現有的資料。在此案例中，Hive 查詢的結果有可能無法反映執行查詢的過程中所做的資料變更。

## 避免超出輸送量
<a name="AvoidExceedingThroughput"></a>

 對 DynamoDB 執行 Hive 查詢時，請注意不要超過您佈建的輸送量，否則會耗盡所需容量，導致應用程式無法進行 `DynamoDB::Get` 呼叫。為了確保此種狀況不會發生，應定期監控讀取量，並到 Amazon CloudWatch 檢查記錄和監控指標，藉以調節應用程式至 `DynamoDB::Get` 的呼叫。

## 請求時間
<a name="RequestTime"></a>

 當對 DynamoDB 資料表的需求較低時，排程存取 DynamoDB 資料表的 Hive 查詢可改善效能。舉例而言，若大多數的應用程式使用者住在舊金山，您可以選擇在太平洋標準時間 (PST) 上午 4 點匯出每日資料。當時大部分的使用者仍在睡眠中，因此無需更新 DynamoDB 資料庫中的記錄。

## 時間型資料表
<a name="TimeBasedTables"></a>

 如果資料是整理為一系列時間型的 DynamoDB 資料表，例如一天一份資料表，則您可以在資料表不再處於作用中狀態時，將資料匯出。您可使用這種技巧，以持續進行中的方式將資料備份至 Amazon S3。

## 封存的資料
<a name="ArchivedData"></a>

 如果您打算對儲存在 DynamoDB 中的資料執行多次 Hive 查詢，則在應用程式可接受封存資料的情況下，您可能需要將資料匯出至 HDFS 或 Amazon S3，並對資料複本執行 Hive 查詢，而非 DynamoDB。如此可保留您的讀取操作和佈建的輸送量。

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR 叢集可使用 Hive、Pig、MapReduce、Hadoop Streaming API 和 Cascading 等 Hadoop 生態系統內常見的工具，直接讀取與處理 Amazon Kinesis 串流。另外，您也可以將 Amazon Kinesis 的即時資料與執行中叢集內的 Amazon S3、Amazon DynamoDB 和 HDFS 上的現有資料互相聯結。您可以直接將資料從 Amazon EMR 載入 Amazon S3 或 DynamoDB，以供後續處理活動使用。如需有關 Amazon Kinesis 服務特色和定價的資訊，請參閱 [Amazon Kinesis](https://aws.amazon.com//kinesis) 頁面。

## 我可以使用 Amazon EMR 和 Amazon Kinesis 整合做什麼？
<a name="kinesis-use-cases"></a>

 在某些情況下，Amazon EMR 與 Amazon Kinesis 之間的整合能讓操作變得更加輕鬆，例如：
+ **分析日誌串流** – 您可以分析 Web 日誌的串流，如此系統每隔幾分鐘便會根據區域、瀏覽器和存取域產生 10 大錯誤類型清單。
+ **客戶參與度** – 您可以撰寫查詢，然後聯結 Amazon Kinesis 的點擊串流資料與儲存在 DynamoDB 資料表內的廣告活動資訊，藉此找出特定網站上成效最佳的廣告類別。
+ **臨機互動式查詢** – 您能夠定期從 Amazon Kinesis 串流將資料載入至 HDFS，並將該資料作為本機 Impala 資料表，以達到快速的互動式分析查詢。

## Amazon Kinesis 串流的檢查點分析
<a name="kinesis-checkpoint"></a>

使用者能定期執行 Amazon Kinesis 串流的批次分析，這稱為*反覆運算*。系統會使用序號來擷取 Amazon Kinesis 串流資料記錄，因此反覆運算範圍是由 Amazon EMR 儲存在 DynamoDB 資料表內的開始和結束序號所定義。例如，當 `iteration0` 結束時，其會將結束序號儲存在 DynamoDB 中。如此一來，`iteration1` 作業開始時，就能從串流中擷取序列資料。在串流資料中，這種疊代映射稱為*設置檢查點*。如需詳細資訊，請參閱 [Kinesis 連接器](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)。

如果對反覆運算設定了檢查點且作業處理反覆運算失敗，則 Amazon EMR 會嘗試重新處理該反覆運算中的記錄。

設置檢查點的功能能夠：
+ 開始處理在上一次在相同串流和邏輯名稱上執行的查詢所處理過的序號後的資料
+ 重新處理 Kinesis 先前由更早的查詢處理過的相同資料批次

 若要啟用設置檢查點功能，請在指令碼中將 `kinesis.checkpoint.enabled` 參數設為 `true`。還可以設定以下參數：


| 組態設定 | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | 將儲存檢查點資訊的 DynamoDB 資料表名稱 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB 資料表的雜湊金鑰名稱 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB 資料表的範圍金鑰名稱 | 
| kinesis.checkpoint.logical.name | 目前處理中的邏輯名稱 | 
| kinesis.checkpoint.iteration.no | 邏輯名稱相關的處理疊代數目 | 
| kinesis.rerun.iteration.without.wait | 布林值，表示是否可以不等待逾時及重新執行失敗的疊代，預設值為 false | 

### 建議 Amazon DynamoDB 資料表使用的佈建 IOPS
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis 的 Amazon EMR 連接器會使用 DynamoDB 資料庫來備份檢查點的中繼資料。您必須在 DynamoDB 內建立資料表，才能在檢查點間隔中以 Amazon EMR 叢集取用 Amazon Kinesis 串流的資料。資料表必須與您的 Amazon EMR 叢集位於相同的區域中。一般而言，建議您參考下方資訊來設定 DynamoDB 資料表佈建的 IOPS 數量。`j` 表示可同時執行的 Hadoop 作業數量上限 (使用不同的邏輯名稱 \$1 反覆運算數目組合)，而 `s` 則是所有作業能處理的碎片最大數量：

若為 **Read Capacity Units (讀取容量單位)**：`j`\$1`s`/`5`

若為 **Write Capacity Units (寫入容量單位)**：`j`\$1`s`

## 效能考量
<a name="performance"></a>

Amazon Kinesis 碎片輸送量會與 Amazon EMR 叢集內的節點執行個體大小和串流中的記錄大小成正比。建議在主節點和核心節點上使用 m5.xlarge 或更大的執行個體。

## 使用 Amazon EMR 排程 Amazon Kinesis 分析
<a name="schedule"></a>

當您分析使用中的 Amazon Kinesis 串流上的資料時，會受到任何反覆運算的逾時和最大期間的限制，因此定期執行分析相當重要，如此才能從串流中定期收集詳細資訊。有很多方式能夠以定期間隔執行這類指令碼和查詢，我們推薦使用 AWS Data Pipeline 處理這類重複的任務。如需詳細資訊，請參閱《AWS Data Pipeline 開發人員指南》**中的 [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html) 和 [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)。

# 將 Spark Kinesis 連接器遷移至適用於 Amazon EMR 7.0 的 SDK 2.x
<a name="migrating-spark-kinesis"></a>

 AWS 開發套件提供一組豐富的 APIs 和程式庫，可與雲端運算服務互動 AWS ，例如管理登入資料、連線至 S3 和 Kinesis 服務。Spark Kinesis 連接器可用於取用 Kinesis Data Streams 中的資料，接收到的資料會在 Spark 的執行引擎中經過轉換和處理。此連接器目前建置在 AWS SDK 和 Kinesis-client-library (KCL) 的 1.x 之上。

作為 AWS SDK 2.x 遷移的一部分，Spark Kinesis 連接器也會隨之更新，以使用 SDK 2.x 執行。在 Amazon EMR 7.0 版本中，Spark 包含尚不可用於 Apache Spark 社群版本中的 SDK 2.x 升級。如果您使用版本低於 7.0 版的 Spark Kinesis 連接器，必須先遷移應用程式程式碼以在 SDK 2.x 上執行，才能遷移到 Amazon EMR 7.0。

## 遷移指南
<a name="migrating-spark-kinesis-migration-guides"></a>

本節說明將應用程式遷移至升級後之 Spark Kinesis 連接器的步驟。它包含遷移至 AWS SDK 2.x 中 Kinesis Client Library (KCL) 2.x、 AWS 憑證提供者 AWS 和服務用戶端的指南。作為參考，其中也包括使用 Kinesis 連接器的 [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) 範本計畫。

**Topics**
+ [將 KCL 從 1.x 遷移到 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [將 AWS 登入資料提供者從 AWS SDK 1.x 遷移至 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [將 AWS 服務用戶端從 AWS SDK 1.x 遷移至 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [串流應用程式的程式碼範例](#migrating-spark-kinesis-streaming-examples)
+ [使用升級後的 Spark Kinesis 連接器時的考量事項](#migrating-spark-kinesis-considerations)

### 將 KCL 從 1.x 遷移到 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream` 中的指標層級和維度**

  在您具現化 `KinesisInputDStream` 時，可控制串流的指標層級和維度。以下範例示範如何使用 KCL 1.x 自訂這些參數：

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  在 KCL 2.x 中，這些組態設定會有不同的套件名稱。遷移至 2.x：

  1. 分別將 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 和 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` 的匯入陳述式變更為 `software.amazon.kinesis.metrics.MetricsLevel` 和 `software.amazon.kinesis.metrics.MetricsUtil`。

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. 使用 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)` 取代行 `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet`

  以下是具有自訂指標層級和指標維度的 `KinesisInputDStream` 更新版本：

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream` 中的訊息處理常式函數

  在具現化 `KinesisInputDStream` 時，您也可以提供一個取得 Kinesis Record 並傳回一般物件 T 的「訊息處理常式函數」，以防萬一您要使用 Record 中包含的其他資料 (例如分割區索引鍵)。

  在 KCL 1.x 中，訊息處理常式函數簽章是：`Record => T`，其中，Record 是 `com.amazonaws.services.kinesis.model.Record`。在 KCL 2.x 中，處理常式的簽章會變更為：`KinesisClientRecord => T`，其中，KinesisClientRecord 是 `software.amazon.kinesis.retrieval.KinesisClientRecord`。

  以下是在 KCL 1.x 中提供訊息處理常式的範例：

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  若要遷移訊息處理常式：

  1. 將匯入陳述式從 `com.amazonaws.services.kinesis.model.Record` 變更為 `software.amazon.kinesis.retrieval.KinesisClientRecord`。

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 更新訊息處理常式的方法簽章。

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  以下是在 KCL 2.x 中提供訊息處理常式的更新範例：

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  如需有關從 KCL 1.x 遷移至 2.x 的詳細資訊，請參閱[將消費者從 KCL 1.x 遷移到 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)。

### 將 AWS 登入資料提供者從 AWS SDK 1.x 遷移至 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

登入資料提供者用來取得與 互動的 AWS 登入資料 AWS。SDK 2.x 中有數個與憑證提供者相關的介面和類別變更，可在[此處](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)了解。Spark Kinesis 連接器已定義傳回 1.x 版 AWS 登入資料提供者的介面 (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 和實作類別。在初始化 Kinesis 用戶端時會需要這些憑證提供者。例如，如果您`SparkAWSCredentials.provider`在應用程式中使用 方法，則需要更新程式碼以使用 2.x 版的 AWS 登入資料提供者。

以下是在 AWS SDK 1.x 中使用登入資料提供者的範例：

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**若要遷移至 SDK 2.x：**

1. 將匯入陳述式從 `com.amazonaws.auth.AWSCredentialsProvider` 變更為 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 更新使用此類別的剩餘程式碼。

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### 將 AWS 服務用戶端從 AWS SDK 1.x 遷移至 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 服務用戶端在 2.x （即 `software.amazon.awssdk`) 中有不同的套件名稱，而 SDK 1.x 使用 。 `com.amazonaws`如需有關用戶端變更的詳細資訊，請參閱[此處](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)。如果您在程式碼中使用這些服務用戶端，則需相應地遷移用戶端。

以下是在 SDK 1.x 中建立用戶端的範例：

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**遷移至 2.x：**

1. 變更服務用戶端的匯入陳述式。以 DynamoDB 用戶端為例。您需將 `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 或 `com.amazonaws.services.dynamodbv2.document.DynamoDB` 變更為 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`。

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 更新初始化用戶端的程式碼

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   如需將 AWS SDK 從 1.x 遷移至 2.x 的詳細資訊，請參閱[適用於 Java 的 AWS SDK 1.x 和 2.x 之間的差異](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 串流應用程式的程式碼範例
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 使用升級後的 Spark Kinesis 連接器時的考量事項
<a name="migrating-spark-kinesis-considerations"></a>
+ 如果您的應用程式使用 JDK 版本低於 11 的 `Kinesis-producer-library`，則可能會遇到異常情況，如 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`。發生這種情況是因為依預設，EMR 7.0 會搭配 JDK 17，且自 Java 11\$1 起，J2EE 模組已從標準程式庫中移除。這可以藉由在 pom 檔案中新增以下相依性來解決。請使用適合的版本取代程式庫版本。

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ 在建立 EMR 叢集之後，您可在此路徑下找到 Spark Kinesis 連接器 jar：`/usr/lib/spark/connector/lib/`

# S3DistCp (s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

Apache DistCp 是一款開源工具，可用來複製大量資料。*S3DistCp* 與 DistCp 類似，但經優化可與 AWS(尤其是 Amazon S3) 搭配使用。Amazon EMR 4.0 版及更新版本內的 S3DistCp 命令為 `s3-dist-cp`，您可以在叢集或命令列中將其新增為步驟之一。透過 S3DistCp，即可有效地將大量 Amazon S3 資料複製到 HDFS，並交由 Amazon EMR 叢集處理後續步驟。您也能利用 S3DistCp 在多個 Amazon S3 儲存貯體間複製資料，或將資料從 HDFS 複製到 Amazon S3。S3DistCp 更具可擴展性和效率。 AWS 

如需在真實世界案例中示範 S3DistCP 彈性的特定命令，請參閱 AWS 大數據部落格上的[使用 S3DistCp 的七個秘訣](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/)。

如同 DistCp，S3DistCp 會使用 MapReduce 以分散式的方式進行複製。它會跨多個伺服器分享複本、處理錯誤、進行復原，以及回報任務。如需有關 Apache DistCp 開放原始碼專案的詳細資訊，請參閱 Apache Hadoop 文件中的 [DistCp 指南](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html)。

若 S3DistCp 無法複製部分或全部的指定檔案，叢集步驟會失敗，並傳回非零的錯誤代碼。若發生此種狀況，S3DistCp 不會清除掉部分複製的檔案。

**重要**  
S3DistCp 不支援含有底線字元的 Amazon S3 儲存貯體名稱。  
S3DistCp 不支援 Parquet 檔案的連接。改用 PySpark。如需詳細資訊，請參閱 [在 Amazon EMR 中串連 parquet 檔案](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/)。  
為了避免使用 S3DistCP 將單一檔案 (而非目錄) 從 S3 複製到 HDFS 時出現複製錯誤，請使用 Amazon EMR 5.33.0 版或更新版本，或者 Amazon EMR 6.3.0 版或更高版本。

## S3DistCp 選項
<a name="UsingEMR_s3distcp.options"></a>

雖然與 DistCp 類似，但 S3DistCp 支援一組不同的選項來變更其複製和壓縮資料的方式。

呼叫 S3DistCp 時，您可以指定下表所述的選項。這些選項會透過引數清單加入步驟中。下表會列出 S3DistCp 引數的範例。


| 選項  | Description  | 必要  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  複製資料的位置。可能為 HDFS 或 Amazon S3 的位置。 範例：`‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`  S3DistCp 不支援含有底線字元的 Amazon S3 儲存貯體名稱。   | 是  | 
| ‑‑dest=LOCATION  |  資料的目的地。可能為 HDFS 或 Amazon S3 的位置。 範例：`‑‑dest=hdfs:///output`  S3DistCp 不支援含有底線字元的 Amazon S3 儲存貯體名稱。   | 是  | 
| ‑‑srcPattern=PATTERN  |  [規則運算式](http://en.wikipedia.org/wiki/Regular_expression)，能夠篩選以 `‑‑src` 資料子集為目標的複製操作。若並未指定 `‑‑srcPattern` 或 `‑‑groupBy`，`‑‑src` 的所有資料均會複製到 `‑‑dest`。 若規則表達式引數含有星號 (\$1) 等特殊字元，則需將規則表達式或整個 `‑‑args` 字串以單引號 (') 括起來。 範例：`‑‑srcPattern=.*daemons.*-hadoop-.*`  | 否  | 
| ‑‑groupBy=PATTERN  |  [規則運算式](http://en.wikipedia.org/wiki/Regular_expression)，S3DistCp 能藉此串連符合該運算式的檔案。舉例而言，您可以使用此選項將所有在一小時內寫入的記錄檔結合成單個檔案。串連的檔案名稱是與分組用的規則表達式相符的該值。 括號代表檔案的分組方式，所有與括號中的陳述相符的項目均會結合成同一個輸出檔案。若規則表達式並無括號內的陳述式，則叢集會在 S3DistCp 步驟失敗，並傳回錯誤。 若規則表達式引數含有星號 (\$1) 等特殊字元，則需將規則表達式或整個 `‑‑args` 字串以單引號 (') 括起來。 指定 `‑‑groupBy` 時，僅會複製符合指定模式的檔案。不需要同時指定 `‑‑groupBy` 和 `‑‑srcPattern`。 範例：`‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | 否  | 
| ‑‑targetSize=SIZE  |  根據 `‑‑groupBy` 選項建立檔案大小，單位為 mebibytes (MiB)。該值必須為整數。若有設定 `‑‑targetSize`，S3DistCp 會常識符合該大小，而實際複製檔案的大小可能會大於或小於該值。根據資料檔案大小彙總的作業，因此目標檔案大小可能會與來源資料檔案大小相符。 若由 `‑‑groupBy` 串連的檔案大於 `‑‑targetSize` 的值，會將檔案分解為多個分割檔，並按順序在名稱結尾加上數字編號。舉例而言，串連為 `myfile.gz` 的檔案會分割為：`myfile0.gz`、`myfile1.gz` 等。 範例：`‑‑targetSize=2`  | 否  | 
| ‑‑appendToLastFile |  在將檔案從 Amazon S3 複製到已存在的 HDFS 時，指定 S3DistCp 的行為。會將新的檔案資料附加到既有檔案中。如果使用 `‑‑appendToLastFile` 搭配 `‑‑groupBy`，新資料會附加到符合相同群組的檔案上。搭配 `‑‑targetSize` 使用時，此操作也會遵守 `‑‑groupBy.` 的行為  | 否  | 
| ‑‑outputCodec=CODEC  |  指定要用複製檔案的壓縮代碼。可能會使用到這些值：`gzip`、`gz`、`lzo`、`snappy` 或 `none`。舉例而言，您可以使用此選項將輸入檔案透過 Gzip 轉換為 LZO 壓縮格式的輸出檔，或在複製操作的過程終將檔案解壓縮。若選擇了輸出代碼，檔案名稱會附加上合適的副檔名 (例如 `gz` 和 `gzip` 的副檔名為 `.gz`)。若不指定 `‑‑outputCodec` 的值，將檔案複製過去時就不會變更壓縮方式。 範例：`‑‑outputCodec=lzo`  | 否  | 
| ‑‑s3ServerSideEncryption  |  確保使用 SSL 傳輸目標資料，並使用 AWS 服務端金鑰在 Amazon S3 中自動加密。當使用 S3DistCp 擷取資料時，物件會自動處於未加密的狀態。如果嘗試將未加密的物件複製到需要加密的 Amazon S3 儲存貯體，複製操作會失敗。如需詳細資訊，請參閱[使用資料加密](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html)。 範例：`‑‑s3ServerSideEncryption`  | 否  | 
| ‑‑deleteOnSuccess  |  如果複製操作成功，此選項會導致 S3DistCp 將複製檔案從來源位置刪除。若是要將記錄檔這類輸出檔案當做排程任務處理，從一個位置複製到另個位置，且您不想重複複製相同的檔案，此功能便相當實用。 範例：`‑‑deleteOnSuccess`  | 否  | 
| ‑‑disableMultipartUpload  |  停用分段上傳。 範例：`‑‑disableMultipartUpload`  | 否  | 
| ‑‑multipartUploadChunkSize=SIZE  |  Amazon S3 分段上傳中每個部分的大小 (MiB)。S3DistCp 在複製大於 `multipartUploadChunkSize` 的資料時使用分段上傳。若要提高作業效能，您可以增加每個部分的大小。預設大小為 128 MiB。 範例：`‑‑multipartUploadChunkSize=1000`  | 否  | 
| ‑‑numberFiles  |  會在輸出檔案的開頭加上順序編號。數字會從 0 開始，除非由 `‑‑startingIndex` 指定不同的值。 範例：`‑‑numberFiles`  | 否  | 
| ‑‑startingIndex=INDEX  |  搭配 `‑‑numberFiles` 使用，以指令序列中的第一個編號。 範例：`‑‑startingIndex=1`  | 否  | 
| ‑‑outputManifest=FILENAME  |  建立一個以 Gzip 壓縮的文字檔，內有 S3DistCp 複製的所有檔案的清單。 範例：`‑‑outputManifest=manifest-1.gz`  | 否  | 
| ‑‑previousManifest=PATH  |  讀取先前使用 `‑‑outputManifest` 旗標呼叫 S3DistCp 時建立的資訊清單檔案。設定 `‑‑previousManifest` 旗標後，S3DistCp 會將列於資訊清單內的檔案從複製操作中排除出去。若與 `‑‑outputManifest` 一起指定 `‑‑previousManifest`，列在之前的資料清單中的檔案也匯出現在新的資訊清單檔案中，但不會複製檔案。 範例：`‑‑previousManifest=/usr/bin/manifest-1.gz`  | 否  | 
| ‑‑requirePreviousManifest |  需要先前呼叫 S3DistCp 期間所建立的資訊清單。若設定為 false，未指定先前的資訊清單，也不會產生任何錯誤。預設值為 true。  | 否  | 
| ‑‑copyFromManifest  |  將 `‑‑previousManifest` 的行為反轉，讓 S3DistCp 使用指定的資訊清單檔案做為複製檔案的清單，而非當做不予複製的檔案清單。 範例：`‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`  | 否  | 
| ‑‑s3Endpoint=ENDPOINT |  指定上傳檔案時要使用的 Amazon S3 端點。此選項會設定來源和目的地兩邊的端點。若未設定，預設端點則為 `s3.amazonaws.com`。如需 Amazon S3 端點的清單，請參閱[區域與端點](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region)。 範例：`‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`  | 否  | 
| ‑‑storageClass=CLASS |  目的地是 Amazon S3 時所用的儲存類別。有效值為 STANDARD 和 REDUCED\$1REDUNDANCY。如果未指定此選項，S3DistCp 會嘗試保留儲存類別。 範例：`‑‑storageClass=STANDARD`  | 否  | 
| ‑‑srcPrefixesFile=PATH |  Amazon S3 (s3://)、HDFS (hdfs:///) 或本機檔案系統 (file:/) 中的文字檔，其中包含一系列 `src` 字首，每行一個字首。 若有提供 `srcPrefixesFile`，S3DistCp 不會列出 src 路徑。反之會產生一份來源清單做為合併的結果，其中列出此檔案中指定的所有字首。會使用相對於 src 路徑的相對路徑來產生目的地路徑，而非這些字首。若也有指定 `srcPattern`，會將之套用到來源字首的合併清單結果，以進一步篩選輸入結果。若有使用 `copyFromManifest`，則會複製資訊清單中的物件，並略過 `srcPrefixesFile`。 範例：`‑‑srcPrefixesFile=PATH`  | 否  | 

除了上述選項之外，S3DistCp 也採用了 [Tool interface](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html)，表示其可支援通用選項。

## 將 S3DistCp 新增為叢集中的步驟
<a name="UsingEMR_s3distcp.step"></a>

您可以將 S3DistCp 新增為叢集中的步驟，便能呼叫 S3DistCp。可在啟動時將步驟新增至叢集，或使用主控台、CLI 或 API 將步驟加入至執行中的叢集。以下範例示範了如何將 S3DistCp 步驟新增至執行中的叢集。如需有關將步驟新增至叢集的詳細資訊，請參閱《Amazon EMR 管理指南》**中的[將工作提交至叢集](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html)。

**使用 將 S3DistCp 步驟新增至執行中的叢集 AWS CLI**

如需在 中使用 Amazon EMR 命令的詳細資訊 AWS CLI，請參閱 [AWS CLI 命令參考](https://docs.aws.amazon.com/cli/latest/reference/emr)。
+ 若要將呼叫 S3DistCp 的步驟新增至叢集，請將指定 S3DistCp 如何將複製操作當做引數執行的參數傳遞過去。

  下列範例會將常駐程式日誌從 Amazon S3 複製到 `hdfs:///output`。在下列命令中：
  + `‑‑cluster-id` 指定了叢集
  + `Jar` 是 S3DistCp JAR 檔案的位置。如需有關如何使用 command-runner.jar 在叢集上執行命令的範例，請參閱[提交自訂 JAR 步驟以執行指令碼或命令](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples)。
  + `Args` 是要傳遞到 S3DistCp 的選項名稱值組的逗號分隔清單。如需可用選項的完整清單，請參閱 [S3DistCp 選項](#UsingEMR_s3distcp.options)。

  若要將 S3DistCp 複製步驟新增到執行中的叢集，請將以下內容放置在儲存於 Amazon S3 中的 JSON 檔案或您的本機檔案系統中，在此範例為 `myStep.json`。將 *j-3GYXXXXXX9IOK* 取代為您的叢集 ID，並將 *amzn-s3-demo-bucket* 取代為您的 Amazon S3 儲存貯體名稱。

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example 將日誌檔案從 Amazon S3 複製到 HDFS**  
此範例也會說明如何透過將步驟新增至執行中的叢集，來將儲存在 Amazon S3 儲存貯體內的日誌檔案複製到 HDFS。在此範例中，`‑‑srcPattern` 選項適用於限制複製到精靈記錄檔的資料。  
若要使用 `‑‑srcPattern` 選項將日誌檔案從 Amazon S3 複製到 HDFS，請將以下內容放置在儲存於 Amazon S3 中的 JSON 檔案或您的本機檔案系統中，在此範例為 `myStep.json`。將 *j-3GYXXXXXX9IOK* 取代為您的叢集 ID，並將 *amzn-s3-demo-bucket* 取代為您的 Amazon S3 儲存貯體名稱。  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## S3DistCp 作業失敗之後清除
<a name="s3distcp-cleanup"></a>

若 S3DistCp 無法複製部分或全部的指定檔案，命令或叢集步驟會失敗，並傳回非零的錯誤代碼。若發生此種狀況，S3DistCp 不會清除掉部分複製的檔案。您必須手動刪除這些檔案。

部分複製的檔案會儲存到子目錄 (具有 S3DistCp 任務的唯一識別符) 的 HDFS `tmp` 目錄中。您可以在任務的標準輸出中找到此 ID。

例如，對於 ID 為 `4b1c37bb-91af-4391-aaf8-46a6067085a6` 的 S3DistCp 任務，您可以連接到叢集的主節點，並執行以下命令以檢視與該任務相關聯的輸出檔案。

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

該命令會傳回類似如下的檔案清單：

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

然後，您可以執行以下命令來刪除目錄和所有內容。

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```