

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 连接器和实用工具
<a name="emr-connectors"></a>

Amazon EMR 提供了许多连接器和实用工具，用于将其他 AWS 服务作为数据源进行访问。您通常可在一个程序内访问这些服务中的数据。例如，您可以在 Hive 查询、Pig 脚本或 MapReduce 应用程序中指定 Kinesis 流，然后对该数据进行操作。

**Topics**
+ [使用 Amazon EMR 导出、导入、查询和连接 DynamoDB 中的表格](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [S3 DistCp (s3-dist-cp)](UsingEMR_s3distcp.md)
+ [在 S3 DistCp 任务失败后进行清理](#s3distcp-cleanup)

# 使用 Amazon EMR 导出、导入、查询和连接 DynamoDB 中的表格
<a name="EMRforDynamoDB"></a>

**注意**  
亚马逊 EMR-DynamoDB 连接器已在上开源。 GitHub有关更多信息，请参阅 [https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector)。

DynamoDB 是一项完全托管式 NoSQL 数据库服务，提供快速且可预测的性能，能够实现无缝扩展。开发人员可以创建数据库表，并可以不受限制地增加请求流量或存储空间。DynamoDB 可自动将表的数据和流量分布到足够多的服务器中，以便处理客户指定的容量请求和数据存储量，同时还能保持性能一致、访问高效。使用 Amazon EMR 和 Hive 可以快速有效地处理大量数据，如 DynamoDB 中存储的数据。有关 DynamoDB 的更多信息，请参阅 [Amazon DynamoDB 开发人员指南](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)。

Apache Hive 是一种可用于查询 map reduce 集群的软件层，使用的是一种名为 HiveQL 的类似 SQL 的简化查询语言。它在 Hadoop 架构的顶层运行。有关 Hive 和 HiveQL 的更多信息，请转至 [HiveQL 语言手册](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)。有关 Hive 和 Amazon EMR 的更多信息，请参阅[Apache Hive](emr-hive.md)。

您可将 Amazon EMR 与自定义版本的 Hive（包含与 DynamoDB 的连接）配合使用来对存储在 DynamoDB 中的数据执行操作：
+ 将 DynamoDB 数据加载到 Hadoop Distributed File System（HDFS）中并用作输入 Amazon EMR 集群的数据。
+ 使用类似 SQL 语句（HiveQL）查询实时 DynamoDB 数据。
+ 连接 DynamoDB 中存储的数据并导出这些数据或查询连接的数据。
+ 将存储在 DynamoDB 中的数据导出到 Amazon S3。
+ 将存储在 Amazon S3 中的数据导入到 DynamoDB。

**注意**  
Amazon EMR-DynamoDB 连接器不支持配置为使用 [Kerberos 身份验证](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html)的集群。

为了执行以下每项任务，您将启动 Amazon EMR 集群，指定 DynamoDB 中数据的位置，并发出 Hive 命令来操作 DynamoDB 中的数据。

有多种方法可以启动 Amazon EMR 集群：您可以使用亚马逊 EMR 控制台、命令行界面 (CLI)，也可以使用软件开发工具包 AWS 或亚马逊 EMR API 对集群进行编程。您还可以选择交互运行 Hive 集群还是从脚本运行。在本节中，我们将介绍如何从 Amazon EMR 控制台和 CLI 启动交互式 Hive 集群。

通过交互的方式使用 Hive 是测试查询性能和调试应用程序的良好方式。确定将定期运行的 Hive 命令集之后，请考虑创建一个 Hive 脚本，让 Amazon EMR 来运行。

**警告**  
DynamoDB 表上的 Amazon EMR 读取或写入操作不利于既定的预置吞吐量，有可能增加预置吞吐量例外情况出现的频率。对于大量请求，Amazon EMR 会使用指数回退实施重试，以管理 DynamoDB 表中的请求负载。如果您与其它流量同时运行 Amazon EMR 任务，就可能导致超出分配的预置吞吐量级别。您可以通过查看 Amazon 中的**ThrottleRequests**指标来监控这一点 CloudWatch。如果请求负载过高，您可以重新启动集群，将[读取百分比设置](EMR_Hive_Optimizing.md#ReadPercent)或[写入百分比设置](EMR_Hive_Optimizing.md#WritePercent)设置为较低的值，从而限制 Amazon EMR 操作。有关 DynamoDB 吞吐量设置的详细信息，请参阅[预置吞吐量](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput)。  
如果表配置为[按需模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand)，则应先将表更改回预配置模式，再运行导出或导入操作。管道需要吞吐量比率才能计算出要从 Dynamo DBtable 中使用的资源。按需模式删除预置的吞吐量。要预配置吞吐容量，您可以使用 Amazon Ev CloudWatch ents 指标来评估表使用的总吞吐量。

**Topics**
+ [设置 Hive 表来运行 Hive 命令](EMR_Interactive_Hive.md)
+ [用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例](EMR_Hive_Commands.md)
+ [在 DynamoDB 中优化 Amazon EMR 操作的性能](EMR_Hive_Optimizing.md)

# 设置 Hive 表来运行 Hive 命令
<a name="EMR_Interactive_Hive"></a>

Apache Hive 是一种可用于使用类似 SQL 的语言查询 Amazon EMR 集群中包含的数据的数据仓库应用程序。有关 Hive 的更多信息，请参阅 [http://hive.apache.org/](http://hive.apache.org/)。

下面的程序假定您已创建集群并指定了 Amazon EC2 密钥对。要了解如何开始创建集群，请参阅《Amazon EMR 管理指南》**中的 [Amazon EMR 入门](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs)。

## 配置 Hive 以供使用 MapReduce
<a name="hive-mapreduce"></a>

在您使用 Amazon EMR 上的 Hive 查询 DynamoDB 表时，如果 Hive 使用默认执行引擎 Tez，则可能会出现错误。因此，当您按本节所述使用 Hive 创建与 DynamoDB 集成的集群时，我们建议您使用将 Hive 设置为使用的配置分类。 MapReduce有关更多信息，请参阅 [配置应用程序](emr-configure-apps.md)。

以下代码段显示了用于设置 MapReduce 为 Hive 执行引擎的配置分类和属性：

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**以交互方式运行 Hive 命令**

1. 连接到主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 当命令提示输入当前主节点时，键入 `hive`。

   您应看到 Hive 提示符：`hive>`

1.  输入用于将 Hive 应用程序中的表映射到 DynamoDB 中的数据的 Hive 命令。该表充当对 Amazon DynamoDB 中存储的数据的引用；数据未存储在本地的 Hive 中，每次运行命令时，使用此表的任何查询将针对 DynamoDB 中的实时数据运行，从而占用此表的读取或写入容量。如果您需要对同一数据集运行多个 Hive 命令，请考虑先将其导出。

    下面说明将 Hive 表映射到 DynamoDB 表的语法。

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    当您在 Hive 中从 DynamoDB 创建表时，必须使用关键字 `EXTERNAL` 将该表创建为外部表。外部表与内部表之间的区别是：删除内部表时，将删除内部表中的数据。当连接 Amazon DynamoDB 时，这不是所需行为，因此仅支持外部表。

    例如，以下 Hive 命令在 Hive 中创建名为 *hivetable1* 的表，该表引用名为 *dynamodbtable1* 的 DynamoDB 表。DynamoDB 表 dynamodbt *able1* 具有主键架构。 hash-and-range哈希键元素是 `name` (字符串类型)。范围键元素是 `year` (数字类型)。每个项目都有 `holidays` (字符串集类型) 的属性值。

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    第 1 行使用 HiveQL `CREATE EXTERNAL TABLE` 语句。对于 *hivetable1*，您需要为 DynamoDB 表中的每个属性名称/值对建立一列，并提供数据类型。这些值不区分大小写，并且您可以为列提供任何名称 (保留字除外)。

    第 2 行使用 `STORED BY` 语句。`STORED BY` 的值是用于处理 Hive 与 DynamoDB 之间连接的类的名称。该值应设置为 `'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'`。

    第 3 行使用 `TBLPROPERTIES` 语句将“hivetable1”与 DynamoDB 中相应的表和架构相关联。为 `TBLPROPERTIES` 提供 `dynamodb.table.name` 参数和 `dynamodb.column.mapping` 参数的值。这些值*是*区分大小写的。
**注意**  
 此表的所有 DynamoDB 属性名称必须在 Hive 表中有对应的列。根据您的 Amazon EMR 版本，如果 one-to-one映射不存在，则会出现以下情况：  
在 Amazon EMR 5.27.0 及更高版本上，连接器经过验证，可确保 DynamoDB 属性名称与 Hive 表中的列 one-to-one之间存在映射。如果 one-to-one映射不存在，则会发生错误。
在 Amazon EMR 5.26.0 及更低版本上，Hive 表将不包含来自 DynamoDB 的名称/值对。如果您未映射 DynamoDB 主键属性，则 Hive 将生成错误。如果您未映射非主键属性，则不会生成错误，但您将无法查看 Hive 表中的数据。如果数据类型不匹配，则值为空。

然后，您可以开始对 *hivetable1* 运行 Hive 操作。根据 *hivetable1* 运行的查询也根据 DynamoDB 账户的 DynamoDB 表 *dynamodbtable1* 在内部运行，在每次执行运行时消耗读取或写入单位。

对 DynamoDB 表运行 Hive 查询时，您需要确保已预置足量的读取容量单位。

例如，假设您为 DynamoDB 表预配置了 100 个读取容量单位。这将允许您每秒执行 100 次读取或读取 409600 字节。如果该表包含 20GB 的数据（21474836480 字节）并且您的 Hive 查询执行全表扫描，则可以估算执行查询将花费多长时间：

 * 21474836480/409600 = 52429 秒 = 14.56 小时 * 

减少所需时间的唯一方法是调整源 DynamoDB 表的读取容量单位。添加更多 Amazon EMR 节点将不会有帮助。

在 Hive 输出中，当一个或多个映射器进程已完成时，将更新完成百分比。对于预配置的读取容量设置较低的大型 DynamoDB 表，完成百分比输出可能会很长时间不更新；在上面的示例中，作业将在几个小时内显示为完成 0%。有关作业进度的详细状态，请转到 Amazon EMR 控制台；您将可以查看单个映射器任务状态和数据读取统计数据。您还可以登录主节点的 Hadoop 界面，查看 Hadoop 统计数据。该界面将向您显示单个映射任务状态和一些数据读取统计数据。有关更多信息，请参阅以下主题：
+ [托管在主节点 (master node) 上的 Web 界面](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [查看 Hadoop Web 界面](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

有关用于执行从 DynamoDB 导出或导入数据和联接表等任务的示例 HiveQL 语句的更多信息，请参阅[用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例](EMR_Hive_Commands.md)。<a name="EMR_Hive_Cancel"></a>

**取消 Hive 请求**

执行 Hive 查询时，来自服务器的初始响应包含用于取消请求的命令。要在此过程中随时取消请求，请使用服务器响应中的 **Kill 命令**。

1. 输入 `Ctrl+C` 可退出命令行客户端。

1.  在 Shell 提示符下，输入服务器对您的请求的初始响应中的 **Kill 命令**。

    或者，您可以从主节点的命令行运行以下命令来终止 Hadoop 作业，其中*job-id*是 Hadoop 作业的标识符，可以从 Hadoop 用户界面中检索。

   ```
   hadoop job -kill job-id
   ```

## Hive 和 DynamoDB 的数据类型
<a name="EMR_Hive_Properties"></a>

下表显示了可用的 Hive 数据类型、它们对应的默认 DynamoDB 类型以及它们也可以映射到的备用 DynamoDB 类型。


| Hive 类型 | 默认 DynamoDB 类型 | 备用 DynamoDB 类型 | 
| --- | --- | --- | 
| 字符串 | 字符串 |  | 
| bigint 或 double | 数字 (N) |  | 
| binary | 二进制 (B) |  | 
| 布尔值 | boolean (BOOL) |  | 
| array | list (L) | 数字集 (NS)、字符串集 (SS) 或二进制集 (BS) | 
| map<string,string> | 项目 | map (M) | 
| map<string,?> | map (M) |  | 
|  | null (NULL) |  | 

如果要将 Hive 数据作为对应的备用 DynamoDB 类型写入，或者您的 DynamoDB 数据包含备用 DynamoDB 类型的属性值，则可以使用 `dynamodb.type.mapping` 参数指定列和 DynamoDB 类型。以下示例显示了用于指定备用类型映射的语法。

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

类型映射参数是可选的，仅必须为使用备用类型的列指定它。

例如，以下 Hive 命令在 Hive 中创建名为 `hivetable2` 的表，该表引用 DynamoDB 表 `dynamodbtable2`。它与 `hivetable1` 相似，不同之处在于它将 `col3` 列映射到字符串集 (SS) 类型。

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

在 Hive 中，`hivetable1` 和 `hivetable2` 是相同的。但是，将这些表中的数据写入其对应的 DynamoDB 表时，`dynamodbtable1` 将包含列表，而 `dynamodbtable2` 将包含字符串集。

如果要将 Hive `null` 值作为 DynamoDB `null` 类型的属性写入，则您可以使用 `dynamodb.null.serialization` 参数来写入。以下示例显示了用于指定 `null` 序列化的语法。

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

空序列化参数是可选的，如果未指定，则设置为 `false`。请注意，无论参数设置如何，DynamoDB `null` 属性都将作为 Hive 中的 `null` 值进行读取。仅当将空序列化参数指定为 `true` 时，才能将具有 `null` 值的 Hive 集合写入 DynamoDB。否则，将出现 Hive 错误。

就精度而言，Hive 中的 bigint 类型与 Java long 类型相同，而 Hive double 类型与 Java double 类型相同。这意味着，如果您有精度高于 Hive 数据类型所提供精度的数值数据存储在 DynamoDB 中，则使用 Hive 导出、导入或引用 DynamoDB 数据会导致精度损失或 Hive 查询失败。

 从 DynamoDB 导出到 Amazon Simple Storage Service（Amazon S3）或 HDFS 的二进制类型作为 Base64 编码的字符串进行存储。如果您要将 Amazon S3 或 HDFS 中的数据导入到 DynamoDB 二进制类型，则该数据应编码为 Base64 字符串。

## Hive 选项
<a name="EMR_Hive_Options"></a>

 您可以设置以下 Hive 选项来管理从 Amazon DynamoDB 的数据传出。这些选项只针对当前 Hive 会话保留。如果您在集群上关闭 Hive 命令提示符并稍后重新打开，则这些设置将恢复为默认值。


| Hive 选项 | 说明 | 
| --- | --- | 
| dynamodb.throughput.read.percent |   设置读取操作的速率，在为您的表分配的范围内保持 DynamoDB 预配置的吞吐速率。该值介于 `0.1` 到 `1.5` 之间（包含端点）。  值 0.5 是默认读取速率，这意味着，Hive 将在表的整个资源中尝试占用一半的预配读取量。增加此值使之高于 0.5 将提高读取请求速率。减少此值使之低于 0.5 将降低读取请求速率。此读取速率是近似值。实际读取速率取决于 DynamoDB 中是否存在统一分配的键等因素。  如果您发现 Hive 操作经常超出您预配的吞吐量，或者如果过多限制了实时读取流量，则可以减少此值使之低于 `0.5`。如果您有足够的容量并希望 Hive 操作的速度更快，请将此值设置为高于 `0.5`。如果您认为有可用的输入/输出操作未使用，则还可以通过将此值设置到最高 1.5 来进行超额预订。  | 
| dynamodb.throughput.write.percent |   设置写入操作的速率，在为您的表分配的范围内保持 DynamoDB 预配置的吞吐速率。该值介于 `0.1` 到 `1.5` 之间（包含端点）。  值 0.5 是默认写入速率，这意味着，Hive 将在表的整个资源中尝试占用一半的预配写入量。增加此值使之高于 0.5 将提高写入请求速率。减少此值使之低于 0.5 将降低写入请求速率。此写入速率是近似值。实际写入速率取决于 DynamoDB 中是否存在统一分配的键等因素   如果您发现 Hive 操作经常超出您预配的吞吐量，或者如果过多限制了实时写入流量，则可以减少此值使之低于 `0.5`。如果您有足够的容量并希望 Hive 操作的速度更快，请将此值设置为高于 `0.5`。如果您认为有可用的输入/输出操作未使用或者这是到表的初始数据上载，还没有实时流量，则还可以通过将此值设置到最高 1.5 来进行超额预订。  | 
| dynamodb.endpoint | 为 DynamoDB 服务指定终端节点。有关可用 DynamoDB 终端节点的更多信息，请参阅[区域和终端节点](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region)。  | 
| dynamodb.max.map.tasks |   指定在从 DynamoDB 读取数据时，映射任务的最大数量。此值必须等于或大于 1。  | 
| dynamodb.retry.duration |   指定要用作重试 Hive 命令的超时时间的分钟数。此值必须是大于或等于 0 的整数。默认超时持续时间为 2 分钟。  | 

 这些选项是使用 `SET` 命令设置的，如以下示例所示。

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# 用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例
<a name="EMR_Hive_Commands"></a>

以下示例使用 Hive 命令执行将数据导出到 Amazon S3 或 HDFS、将数据导入到 DynamoDB、连接表、查询表等操作。

对 Hive 表执行的操作将引用 DynamoDB 中存储的数据。Hive 命令受到 DynamoDB 表预置的吞吐量设置约束，并且检索的数据包括 DynamoDB 处理 Hive 操作请求时写入到 DynamoDB 表的数据。如果数据检索过程需要很长一段时间，则自 Hive 命令开始执行以来，Hive 命令返回的某些数据可能已在 DynamoDB 中更新。

Hive 命令 `DROP TABLE` 和 `CREATE TABLE` 仅对 Hive 中的本地表进行操作，而不会在 DynamoDB 中创建或删除表。如果 Hive 查询引用 DynamoDB 中的表，则在您运行查询之前，该表必须已存在。有关在 DynamoDB 中创建和删除表的更多信息，请参阅 *Amazon DynamoDB 开发人员指南*中的[在 DynamoDB 中处理表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html)。

**注意**  
 当您将 Hive 表映射到 Amazon S3 中的某个位置时，请勿将其映射到存储桶的根路径 s3://amzn-s3-demo-bucket，因为这可能会导致 Hive 将数据写入 Amazon S3 时出错。而是将表映射到存储桶的子路径 s3://amzn-s3-demo-bucket/mypath。

## 从 DynamoDB 中导出数据
<a name="EMR_Hive_Commands_exporting"></a>

 可以使用 Hive 从 DynamoDB 中导出数据。

**将 DynamoDB 表导出到 Amazon S3 存储桶**
+  创建一个引用 DynamoDB 中存储的数据的 Hive 表。然后，您可以调用 INSERT OVERWRITE 命令将数据写入到外部目录。在以下示例中，*s3://amzn-s3-demo-bucket/path/subpath/*是 Amazon S3 中的有效路径。调整 CREATE 命令中的列和数据类型来匹配 DynamoDB 中的值。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的存档。

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**使用格式设置将 DynamoDB 表导出到 Amazon S3 存储桶**
+  创建引用 Amazon S3 中的位置的外部表。此表在下面显示为 s3\$1export。在调用 CREATE 期间，为此表指定行格式设置。然后，当您使用 INSERT OVERWRITE 将数据从 DynamoDB 导出到 s3\$1export 时，数据将以指定的格式写出。在以下示例中，数据以逗号分隔值 (CSV) 的格式写出。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**在不指定列映射的情况下将 DynamoDB 表导出到 Amazon S3 存储桶**
+  创建一个引用 DynamoDB 中存储的数据的 Hive 表。此例与前面的示例类似，只是不指定列映射。该表必须正好具有类型为 `map<string, string>` 的一个列。如果您随后在 Amazon S3 中创建 `EXTERNAL` 表，可以调用 `INSERT OVERWRITE` 命令将数据从 DynamoDB 写入到 Amazon S3。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的存档。由于没有列映射，因此您无法查询以此方式导出的表。在 Hive 0.8.1.5 或更高版本（在 Amazon EMR AMI 2.2.*x* 及其更高版本上受支持）中导出数据而不指定列映射。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**使用数据压缩将 DynamoDB 表导出到 Amazon S3 存储桶**
+  Hive 提供多个可以在 Hive 会话期间设置的压缩编解码器。这样做会导致导出的数据以指定的格式进行压缩。以下示例使用 Lempel-Ziv-Oberhumer (LZO) 算法压缩导出的文件。

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   可用的压缩编解码器包括：
  +  org.apache.hadoop.io.compress。 GzipCodec 
  +  org.apache.hadoop.io.compress。 DefaultCodec 
  +  com.hadoop.compression.lzo。 LzoCodec 
  +  com.hadoop.compression.lzo。 LzopCodec 
  +  org.apache.hadoop.io.compress。 BZip2编解码器 
  +  org.apache.hadoop.io.compress。 SnappyCodec 

**将 DynamoDB 表导出到 HDFS**
+  使用以下 Hive 命令，其中*hdfs:///directoryName*是有效的 HDFS 路径，是 Hive 中引用 Dynam *hiveTableName* oDB 的表。此导出操作比将 DynamoDB 表导出到 Amazon S3 速度快，因为将数据导出到 Amazon S3 时，Hive 0.7.1.1 将 HDFS 用作中间步骤。以下示例还显示了如何将 `dynamodb.throughput.read.percent` 设置为 1.0 以提高读取请求速率。

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   您还可以按上面所示的用于导出到 Amazon S3 的方法，使用格式设置和压缩将数据导出到 HDFS。为此，只需将上面示例中的 Amazon S3 目录替换为 HDFS 目录。<a name="EMR_Hive_non-printable-utf8"></a>

**在 Hive 中读取不可打印的 UTF-8 字符数据**
+ 创建表时，您可以使用 `STORED AS SEQUENCEFILE` 子句在 Hive 中读取和写入不可打印的 UTF-8 字符数据。A SequenceFile 是 Hadoop 二进制文件格式；你需要使用 Hadoop 来读取这个文件。以下示例显示了如何将数据从 DynamoDB 导出到 Amazon S3 中。可以使用此功能处理不可打印的 UTF-8 编码字符。

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## 将数据导入到 DynamoDB
<a name="EMR_Hive_Commands_importing"></a>

 使用 Hive 将数据写入到 DynamoDB 中时，应确保写入容量单位数大于集群中的映射器数。例如，在 m1.xlarge EC2 实例上运行的集群在每个实例上生成 8 个映射器。对于具有 10 个实例的集群，这意味着生成 80 个映射器。如果写入容量单位数不大于集群中的映射器数，则 Hive 写入操作可能会占用所有写入吞吐量，或者尝试占用超过预配置值的吞吐量。有关每种 EC2 实例类型生成的映射器数的更多信息，请参阅 [配置 Hadoop](emr-hadoop-config.md)。

 Hadoop 中的映射器数由输入的拆分数控制。如果拆分数过小，写入命令可能无法占用所有可用的写入吞吐量。

 如果具有相同键的项目在目标 DynamoDB 表中存在，则将覆盖该项目。如果目标 DynamoDB 表中不存在具有该键的项目，则将插入该项目。

**要将数据从 Amazon S3 导入 DynamoDB**
+  您可以使用 Amazon EMR（Amazon EMR）和 Hive 将数据从 Amazon S3 写入到 DynamoDB。

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**在不指定列映射时将表从 Amazon S3 存储桶导入到 DynamoDB 中**
+  创建一个引用 Amazon S3 中存储数据的 `EXTERNAL` 表，该数据是以前从 DynamoDB 中导出的。在导入之前，请确保该表存在于 DynamoDB 中，并且该表与以前导出的 DynamoDB 表具有相同的键架构。此外，该表还必须正好具有类型为 `map<string, string>` 的一个列。如果您随后创建一个链接到 DynamoDB 的 Hive 表，则可以调用 `INSERT OVERWRITE` 命令将数据从 Amazon S3 写入到 DynamoDB 中。由于没有列映射，因此您无法查询以此方式导入的表。在 Hive 0.8.1.5 或更高版本（在 Amazon EMR AMI 2.2.3 及其更高版本上受支持）中可以在不指定列映射时导入数据。

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**将表从 HDFS 导入到 DynamoDB 中**
+  可以使用 Amazon EMR 和 Hive 将数据从 HDFS 写入到 DynamoDB 中。

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## 查询 DynamoDB 中的数据
<a name="EMR_Hive_Commands_querying"></a>

 以下示例显示了您可以使用 Amazon EMR 查询 DynamoDB 中存储数据的各种方式。

**查找映射列的最大值 (`max`)**
+  使用如下 Hive 命令。在第一个命令中，CREATE 语句创建了一个引用 DynamoDB 中存储数据的 Hive 表。然后，SELECT 语句使用该表查询 DynamoDB 中存储的数据。以下示例查找给定客户提交的最大订单。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**使用 `GROUP BY` 子句聚合数据**
+  可以使用 `GROUP BY` 子句收集多条记录的数据。此子句通常与聚合函数 (如 sum、count、min 或 max) 一起使用。以下示例返回提交了三个以上订单的客户的最大订单列表。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**连接两个 DynamoDB 表**
+  以下示例将两个 Hive 表映射到 DynamoDB 中存储的数据。然后，它对这两个表调用联接。连接在集群上计算并返回。连接不在 DynamoDB 中进行。此示例返回提交了两个以上订单的客户及其购买物的列表。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**联接来自不同源的两个表**
+  在以下示例中，Customer\$1S3 是加载了 Amazon S3 中存储的 CSV 文件的 Hive 表，而 hive\$1purchases 是引用了 DynamoDB 中的数据的表。以下示例将 Amazon S3 中以 CSV 文件格式存储的客户数据与 DynamoDB 中存储的订单数据连接在一起，以返回一组数据，这些数据表示名称中包含“Miller”的客户提交的订单。

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**注意**  
 在上述示例中，为了提高清晰性和完整性，在每个示例中均包括了 CREATE TABLE 语句。针对给定 Hive 表运行多个查询或执行导出操作时，只需在 Hive 会话的开始创建表一次即可。

# 在 DynamoDB 中优化 Amazon EMR 操作的性能
<a name="EMR_Hive_Optimizing"></a>

 对 DynamoDB 表进行的 Amazon EMR 操作作为读取操作，并受表预置的吞吐量设置约束。Amazon EMR 实施其逻辑来努力平衡 DynamoDB 表上的负载，从而最大程度降低超出您预置的吞吐量的可能性。每个 Hive 查询结束时，Amazon EMR 均返回有关用于处理查询的集群的信息，包括超出预置的吞吐量的次数。您可以使用这些信息以及有关您的 DynamoDB 吞吐量的 CloudWatch 指标，在后续请求中更好地管理 DynamoDB 表上的负载。

 以下因素会影响 Hive 在处理 DynamoDB 表时的查询性能。

## 预置的读取容量单位
<a name="ProvisionedReadCapacityUnits"></a>

 对 DynamoDB 表运行 Hive 查询时，您需要确保已预置足量的读取容量单位。

 例如，假设您为 DynamoDB 表预配置了 100 个读取容量单位。这将允许您每秒执行 100 次读取或读取 409600 字节。如果该表包含 20GB 的数据（21474836480 字节）并且您的 Hive 查询执行全表扫描，则可以估算执行查询将花费多长时间：

 * 21474836480/409600 = 52429 秒 = 14.56 小时 * 

 减少所需时间的唯一方法是调整源 DynamoDB 表的读取容量单位。将更多节点添加到 Amazon EMR 集群不会有所帮助。

 在 Hive 输出中，当一个或多个映射器进程已完成时，将更新完成百分比。对于预配置的读取容量设置较低的大型 DynamoDB 表，完成百分比输出可能会很长时间不更新；在上面的示例中，作业将在几个小时内显示为完成 0%。有关作业进度的详细状态，请转到 Amazon EMR 控制台；您将可以查看单个映射器任务状态和数据读取统计数据。

 您还可以登录主节点的 Hadoop 界面，查看 Hadoop 统计数据。该界面将向您显示单个映射任务状态和一些数据读取统计数据。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[托管在主节点上的 Web 页面](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)。

## 读取百分比设置
<a name="ReadPercent"></a>

 默认情况下，Amazon EMR 根据当前的预置吞吐量管理对您的 DynamoDB 表的请求负载。但是，当 Amazon EMR 返回的作业相关信息中包括预置的吞吐量远远超出响应数时，您可以在设置 Hive 表时使用 `dynamodb.throughput.read.percent` 参数调整默认读取速率。有关设置读取百分比参数的更多信息，请参阅 [Hive 选项](EMR_Interactive_Hive.md#EMR_Hive_Options)。

## 写入百分比设置
<a name="WritePercent"></a>

 默认情况下，Amazon EMR 根据当前的预置吞吐量管理对您的 DynamoDB 表的请求负载。但是，当 Amazon EMR 返回的作业相关信息中所含预置的吞吐量远远超出响应数时，您可以在设置 Hive 表时使用 `dynamodb.throughput.write.percent` 参数调整默认写入速率。有关设置写入百分比参数的更多信息，请参阅[Hive 选项](EMR_Interactive_Hive.md#EMR_Hive_Options)。

## 重试持续时间设置
<a name="emr-ddb-retry-duration"></a>

 默认情况下，如果在两分钟（默认重试时间间隔）内没有返回结果，Amazon EMR 将重新运行 Hive 查询。在运行 Hive 查询时，您可以通过设置 `dynamodb.retry.duration` 参数来调整此时间间隔。有关设置写入百分比参数的更多信息，请参阅[Hive 选项](EMR_Interactive_Hive.md#EMR_Hive_Options)。

## 映射任务数
<a name="NumberMapTasks"></a>

 Hadoop 为了处理导出和查询 DynamoDB 中所存储数据的请求而启动的映射器守护进程的读取速率控制在每秒最多 1 MiB 之内，以限制所用的读取容量。如果在 DynamoDB 上有更多预置的吞吐量可用，则可以通过增加映射器守护进程数来改善 Hive 导出和查询操作的性能。为此，您可以增加中的 EC2 实例数，*或者*增加每个 EC2 实例上运行的映射器守护进程数。

 您可以通过停止当前集群，然后使用更大的 EC2 实例数重新启动它，来增加该集群中的 EC2 实例数。可在 **Configure EC2 Instances (配置 EC2 实例)** 对话框中指定 EC2 实例数（如果您从 Amazon EMR 控制台启动集群），也可以使用 `‑‑num-instances` 选项指定 EC2 实例数（如果您从 CLI 启动）。

 实例上运行的映射任务数取决于 EC2 实例类型。有关受支持 EC2 实例类型及每种实例类型提供的映射器数的更多信息，请参阅 [任务配置](emr-hadoop-task-config.md)。其中，每个受支持的配置都有一个“任务配置”部分。

 增加映射器守护程序数的另一个方法是，将 Hadoop 的 `mapreduce.tasktracker.map.tasks.maximum` 配置参数更改为更大的值。此方法的优点是无需增加 EC2 实例的数量或大小即可为您提供更多映射器，从而为您节省资金。缺点是将此值设置得过大可能导致集群中的 EC2 实例用尽内存。要设置 `mapreduce.tasktracker.map.tasks.maximum`，请启动集群并为 `mapreduce.tasktracker.map.tasks.maximum` 指定一个值，作为 mapred-site 配置分类的属性。如以下示例所示。有关更多信息，请参阅[配置应用程序](emr-configure-apps.md)。

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## 并行数据请求
<a name="ParallelDataRequests"></a>

 从多个用户或多个应用程序向单个表发出的多个数据请求可能会耗尽预配置的读取吞吐量并降低性能。

## 处理持续时间
<a name="ProcessDuration"></a>

 DynamoDB 中的数据一致性取决于在每个节点上执行读取和写入操作的顺序。当正在进行 Hive 查询时，其它应用程序可能会将新数据加载到 DynamoDB 表，或者修改或删除现有数据。在这种情况下，Hive 查询的结果可能无法反映查询运行时对数据所做的更改。

## 避免超出吞吐量
<a name="AvoidExceedingThroughput"></a>

 针对 DynamoDB 运行 Hive 查询时，请注意不要超出您的预置吞吐量，因为这会用尽应用程序调用 `DynamoDB::Get` 时所需的容量。为确保不会发生这种情况，您应`DynamoDB::Get`通过查看 Amazon 中的日志和监控指标，定期监控读取量和应用程序调用的限制。 CloudWatch

## 请求时间
<a name="RequestTime"></a>

 调度 Hive 查询以便在对 DynamoDB 表的需求较低时访问 DynamoDB 表，可以改善性能。举例来说，如果应用程序的大多数用户住在旧金山，您可以选择在太平洋标准时间凌晨 4 点导出每日数据。（此时，大多数用户都已睡着且未在更新 DynamoDB 数据库中的记录） 

## 基于时间的表
<a name="TimeBasedTables"></a>

 如果将数据组织为一系列基于时间的 DynamoDB 表（例如，每天一个表），您可以在该表不再处于活动状态时导出数据。您可以利用此方法将数据持续备份到 Amazon S3 中。

## 已存档数据
<a name="ArchivedData"></a>

 如果您计划针对 DynamoDB 中存储的数据运行多个 Hive 查询，并且您的应用程序可以接纳已存档数据，那么您可能会希望将数据导出到 HDFS 或 Amazon S3，然后针对数据的副本（而非 DynamoDB）运行 Hive 查询。这将节省读取操作和预配置的吞吐量。

# Kinesis
<a name="emr-kinesis"></a>

亚马逊 EMR 集群可以使用 Hadoop 生态系统中熟悉的工具（例如 Hive、Pig、Hadoop Streaming API 和 Cascading MapReduce）直接读取和处理 Amazon Kinesis 直播。您还可以将 Amazon Kinesis 中的实时数据与正在运行的集群中 Amazon S3、Amazon DynamoDB 和 HDFS 上的现有数据进行连接。您可以直接将 Amazon EMR 中的数据加载到 Amazon S3 或 DynamoDB 来进行后处理。有关 Amazon Kinesis 服务亮点和定价的信息，请参阅 [Amazon Kinesis](https://aws.amazon.com//kinesis) 页面。

## 可以对 Amazon EMR 和 Amazon Kinesis 集成执行哪些操作？
<a name="kinesis-use-cases"></a>

 Amazon EMR 和 Amazon Kinesis 之间的集成使某些方案更简单，例如：
+ **流式处理日志分析**：您可以分析流式处理 Web 日志，以便每隔几分钟按区域、浏览器和访问域生成前 10 个错误类型的列表。
+ **客户参与**：您可以编写查询将 Amazon Kinesis 中的点击流数据与存储在 DynamoDB 表中的广告活动信息进行连接，以确定显示在特定网站上的最有效广告类别。
+ **即席交互式查询**：您可以定期将 Amazon Kinesis 流中的数据加载到 HDFS 中，并以本地 Impala 表的形式提供该数据以进行快速的交互式分析查询。

## 对 Amazon Kinesis 流进行检查点分析
<a name="kinesis-checkpoint"></a>

用户可以定期对 Amazon Kinesis 流进行批量分析，这些分析称为*迭代*。因为使用序列号检索 Amazon Kinesis 流数据记录，所以，可通过 Amazon EMR 在 DynamoDB 表中存储的开始和结束序列号来定义迭代边界。例如，当 `iteration0` 结束时，它在 DynamoDB 中存储结束序列号，这样在 `iteration1` 作业开始时，它可以检索流的后续数据。迭代在流数据中的这种映射称为*检查点操作*。有关更多信息，请参阅 [Kinesis 连接器](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)。

如果对迭代进行了检查点操作且作业未能处理某个迭代，则 Amazon EMR 会尝试重新处理该迭代中的记录。

通过检查点功能，您可以：
+ 从运行于相同的流和逻辑名称之上的前一个查询处理的序列号之后，开始数据处理
+ 重新处理 Kinesis 中由之前的查询处理的同一批数据

 要启用检查点操作，请在脚本中将 `kinesis.checkpoint.enabled` 参数设置为 `true`。此外，请配置以下参数：


| 配置设置 | 说明 | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | 用于存储检查点信息的 DynamoDB 表名称 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB 表的哈希键名称 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB 表的范围键名称 | 
| kinesis.checkpoint.logical.name | 当前处理的逻辑名称 | 
| kinesis.checkpoint.iteration.no | 与逻辑名称关联的处理的迭代编号 | 
| kinesis.rerun.iteration.without.wait | 用来指示是否可以重新运行失败的迭代而不等待超时的布尔值；默认值为 false | 

### Amazon DynamoDB 表的预置 IOPS 建议
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis 的 Amazon EMR 连接器使用 DynamoDB 数据库作为对元数据进行检查点操作的支持。必须先在 DynamoDB 中创建表，才能以检查点时间间隔使用 Amazon EMR 集群的 Amazon Kinesis 流中的数据。该表必须与 Amazon EMR 集群位于相同区域中。以下是为您应当为 DynamoDB 表预置的 IOPS 数的一般建议；`j` 应当是可同时运行的最大 Hadoop 任务数（具有不同的逻辑名称\$1迭代编号组合），`s` 是任何作业将处理的最大分片数：

对于 **Read Capacity Units**：`j`\$1`s`/`5`

对于 **Write Capacity Units**：`j`\$1`s`

## 性能注意事项
<a name="performance"></a>

Amazon Kinesis 分片吞吐量与 Amazon EMR 集群中节点的实例大小以及流中的记录大小成正比。建议在主节点和核心节点上使用 m5.xlarge 或更大的实例。

## 借助 Amazon EMR 安排 Amazon Kinesis 分析
<a name="schedule"></a>

如果要对活动 Amazon Kinesis 流分析数据，由于任何迭代都受超时和最长持续时间限制，您应经常运行分析，以便从流定期收集详细信息，这十分重要。可以通过多种方式定期执行该类脚本和查询；但建议针对此类周期性任务使用 AWS Data Pipeline 。有关更多信息，请参阅《*AWS Data Pipeline 开发人员指南*》[AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)中的[AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)和。

# 将 Spark Kinesis 连接器迁移到适用于 Amazon EMR 7.0 的 SDK 2.x
<a name="migrating-spark-kinesis"></a>

该 AWS 软件开发工具包提供了一组丰富的库来与 AWS 云计算服务进行交互，例如管理凭据、连接到 S3 和 Kinesis 服务。 APIs Spark Kinesis 连接器用于使用来自 Kinesis Data Streams 的数据，且接收到的数据将在 Spark 的执行引擎中进行转换和处理。目前，此连接器是在 AWS SDK 和 Kinesis-client-library (KCL) 的 1.x 基础上构建的。

作为 AWS SDK 2.x 迁移的一部分，Spark Kinesis 连接器也相应进行了更新，使其可以与 SDK 2.x 一起运行。在 Amazon EMR 7.0 发行版中，Spark 包含 SDK 2.x 升级，该升级尚不可在 Apache Spark 的社区版本中使用。如果您使用低于 7.0 的版本中的 Spark Kinesis 连接器，则必须先将应用程序代码迁移到在 SDK 2.x 上运行，然后才能迁移到 Amazon EMR 7.0。

## 迁移指南
<a name="migrating-spark-kinesis-migration-guides"></a>

本部分介绍将应用程序迁移到升级后的 Spark Kinesis 连接器的步骤。它包括迁移到 Kinesis 客户端库 (KCL) 2.x、 AWS 证书提供程序和 SDK 2.x 中的 AWS 服务客户端的指南。 AWS 作为参考，它还包括一个使用 Kinesis 连接器的示例[WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)程序。

**Topics**
+ [将 KCL 从 1.x 迁移到 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [将 AWS 凭证提供程序从 AWS SDK 1.x 迁移到 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [将 AWS 服务客户端从 AWS SDK 1.x 迁移到 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [流式传输应用程序的代码示例](#migrating-spark-kinesis-streaming-examples)
+ [使用升级后的 Spark Kinesis 连接器时的注意事项](#migrating-spark-kinesis-considerations)

### 将 KCL 从 1.x 迁移到 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream` 中的指标级别和维度**

  当您实例化 `KinesisInputDStream` 时，您可以控制流的指标级别和维度。以下示例演示了如何使用 KCL 1.x 自定义这些参数：

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  在 KCL 2.x 中，这些配置设置具有不同的包名称。要迁移到 2.x：

  1. 分别将 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 和 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` 的导入语句更改为 `software.amazon.kinesis.metrics.MetricsLevel` 和 `software.amazon.kinesis.metrics.MetricsUtil`。

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. 将行 `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` 替换为 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  以下是包含自定义指标级别和指标维度的 `KinesisInputDStream` 的更新版本：

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream` 中的消息处理程序函数

  在实例化 `KinesisInputDStream` 时，您还可以提供一个“消息处理程序函数”，该函数接收 Kinesis 记录并返回通用对象 T，以备您想使用记录中包含的其他数据（例如分区键）。

  在 KCL 1.x 中，消息处理程序函数签名为：`Record => T`，其中记录为 `com.amazonaws.services.kinesis.model.Record`。在 KCL 2.x 中，处理程序的签名更改为:`KinesisClientRecord => T`，其中 KinesisClientRecord。`software.amazon.kinesis.retrieval.KinesisClientRecord`

  下面是在 KCL 1.x 中提供消息处理程序的示例：

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  要迁移消息处理程序：

  1. 将 `com.amazonaws.services.kinesis.model.Record` 的导入语句更改为 `software.amazon.kinesis.retrieval.KinesisClientRecord`。

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 更新消息处理程序的方法签名。

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  下面是在 KCL 2.x 中提供消息处理程序的更新示例：

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  有关从 KCL 1.x 迁移到 2.x 的更多信息，请参阅[将使用者从 KCL 1.x 迁移到 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)。

### 将 AWS 凭证提供程序从 AWS SDK 1.x 迁移到 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

凭证提供者用于获取与之交互的 AWS 凭证 AWS。SDK 2.x 中有几项与凭证提供程序相关的接口和类更改，可参见[此处](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)。Spark Kinesis 连接器定义了一个接口 (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 和实现类，用于返回 1.x 版本的 AWS 凭据提供程序。初始化 Kinesis 客户端时需要这些凭证提供程序。例如，如果您在应用程序`SparkAWSCredentials.provider`中使用该方法，则需要更新代码以使用 2.x 版本的 AWS 凭据提供程序。

以下是在 S AWS DK 1.x 中使用凭证提供程序的示例：

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**要迁移到 SDK 2.x：**

1. 将 `com.amazonaws.auth.AWSCredentialsProvider` 的导入语句更改为 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 更新使用此类的其余代码。

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### 将 AWS 服务客户端从 AWS SDK 1.x 迁移到 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 服务客户端在 2.x（即`software.amazon.awssdk`）中具有不同的软件包名称，而 SDK 1.x 则使用。`com.amazonaws`有关客户端更改的更多信息，请参阅[此处](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)。如果您在代码中使用这些服务客户端，则需要相应地迁移客户端。

下面是在 SDK 1.x 中创建客户端的示例：

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**要迁移到 2.x：**

1. 请更改服务客户端的导入语句。以 DynamoDB 客户端为例。您需要将 `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 或 `com.amazonaws.services.dynamodbv2.document.DynamoDB` 更改为 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`。

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 更新用初始化客户端的代码

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   有关将 AWS SDK 从 1.x 迁移到 2.x 的更多信息，请参阅适用于 [Java 的 AWS SDK 1.x 和 2.x 有什么区别](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 流式传输应用程序的代码示例
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 使用升级后的 Spark Kinesis 连接器时的注意事项
<a name="migrating-spark-kinesis-considerations"></a>
+ 如果您的应用程序将 `Kinesis-producer-library` 用于低于 11 版本的 JDK，则可能会遇到异常，例如 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`。之所以发生这种情况，是因为 EMR 7.0 默认附带 JDK 17，而自 Java 11\$1 版本以来，J2EE 模块就已从标准库中移除。此问题可以通过在 pom 文件中添加以下依赖项来解决。将库版本替换为您认为合适的版本。

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ EMR 集群被创建后，可以在此路径下找到 Spark Kinesis 连接器 jar：`/usr/lib/spark/connector/lib/`

# S3 DistCp (s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

Apache DistCp 是一个开源工具，你可以用来复制大量数据。S3 与 *DistCpAmazon S3* 类似 DistCp，但经过了优化 AWS，尤其是 Amazon S3。DistCp 在 Amazon EMR 4.0 及更高版本中，S3 的命令是`s3-dist-cp`，您可以将其作为集群中的一个步骤或在命令行中添加。使用 S3DistCp，您可以高效地将大量数据从 Amazon S3 复制到 HDFS 中，然后通过您的 Amazon EMR 集群中的后续步骤进行处理。您也可以使用 S3 DistCp 在亚马逊 S3 存储桶之间复制数据，或者从 HDFS 复制数据到 Amazon S3。S3 DistCp 在跨存储桶和跨 AWS 账户并行复制大量对象方面更具可扩展性和效率。

有关在现实场景中演示 S3DistCp 灵活性的特定命令，请参阅 AWS 大数据博客DistCp上[的 S3 使用七个技巧](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/)。

比如 DistCp，S3 DistCp MapReduce 使用分布式方式进行复制。它在几个服务器之间共享复制、错误处理、恢复和报告任务。有关 Apache DistCp 开源项目的更多信息，请参阅 Apache Hadoop 文档中的[DistCp指南](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html)。

如果 DistCp S3 无法复制部分或全部指定文件，则集群步骤将失败并返回非零错误代码。如果发生这种情况，S3 DistCp 不会清理部分复制的文件。

**重要**  
S3 DistCp 不支持包含下划线字符的 Amazon S3 存储桶名称。  
S3 DistCp 不支持 Parquet 文件的串联。 PySpark 改用。有关更多信息，请参阅[在 Amazon EMR 中串连 parquet 文件](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/)。  
为避免在使用 S3DistCP 将单个文件（而不是目录）从 S3 复制到 HDFS 时出现复制错误，请使用 Amazon EMR 版本 5.33.0 或更高版本或 Amazon EMR 6.3.0 或更高版本。

## S3 DistCp 选项
<a name="UsingEMR_s3distcp.options"></a>

尽管与之类似 DistCp，但 S3 DistCp 支持一组不同的选项来更改其复制和压缩数据的方式。

调用 S3 时DistCp，您可以指定下表中描述的选项。选项是用参数列表添加到步骤的。下表显示了 S3 DistCp 参数的示例。


| 选项  | 描述  | 必填  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  待复制数据的位置。可以是 HDFS 或 Amazon S3 位置。 示例：`‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`  S3 DistCp 不支持包含下划线字符的 Amazon S3 存储桶名称。   | 是  | 
| ‑‑dest=LOCATION  |  数据的目标位置。可以是 HDFS 或 Amazon S3 位置。 示例：`‑‑dest=hdfs:///output`  S3 DistCp 不支持包含下划线字符的 Amazon S3 存储桶名称。   | 是  | 
| ‑‑srcPattern=PATTERN  |  该[正则表达式](http://en.wikipedia.org/wiki/Regular_expression)会筛选 `‑‑src` 的部分数据的复制操作。如果指定的既不是 `‑‑srcPattern` 也不是 `‑‑groupBy`，那么会将 `‑‑src` 的所有数据复制到 `‑‑dest`。 如果正则表达式参数包含特殊字符，如星号（\$1），那么必须将正则表达式或整个 `‑‑args` 字符串括在单引号（'）中。 示例：`‑‑srcPattern=.*daemons.*-hadoop-.*`  | 否  | 
| ‑‑groupBy=PATTERN  |  一种[正则表达式](http://en.wikipedia.org/wiki/Regular_expression)，它使 S3 DistCp 连接与该表达式匹配的文件。例如，您可以使用此选项把一个小时内写入的所有日志文件组合成为单个文件。已连接文件的文件名是与该分组的正则表达式相匹配的值。 圆括号表明文件应使用的分组方式，与圆括号内的语句匹配的所有项目都组合成单个输出文件。如果正则表达式不包含带括号的语句，则集群将在 S3 DistCp 步骤上失败并返回错误。 如果正则表达式参数包含特殊字符，如星号（\$1），那么必须将正则表达式或整个 `‑‑args` 字符串括在单引号（'）中。 如果指定 `‑‑groupBy`，则仅复制与指定的模式匹配的文件。您不需要同时指定 `‑‑groupBy` 和 `‑‑srcPattern`。 示例：`‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | 否  | 
| ‑‑targetSize=SIZE  |  要根据 `‑‑groupBy` 选项创建的文件的大小，以兆字节 (MiB) 为单位。此值必须是整数。设置后`‑‑targetSize`，S3 DistCp 会尝试匹配此大小；复制文件的实际大小可能大于或小于此值。基于数据文件大小聚合任务，因此目标文件大小将可能匹配源数据文件大小。 如果由 `‑‑groupBy` 连接的文件大于 `‑‑targetSize` 的值，则将这些文件分解成部分文件，并在末尾附加一个数值按顺序命名。例如，连接组成 `myfile.gz` 的一个文件将被分解成部分文件，如：`myfile0.gz`、`myfile1.gz` 等。 示例：`‑‑targetSize=2`  | 否  | 
| ‑‑appendToLastFile |  指定 S3 在从 Amazon S3 复制到 HDFS 中已存在的文件DistCp 时的行为。它向现有文件附加新文件数据。如果将 `‑‑appendToLastFile` 与 `‑‑groupBy` 结合使用，则将新数据附加到与相同的组匹配的文件。与 `‑‑targetSize` 结合使用时，此选项也具有 `‑‑groupBy.` 行为  | 否  | 
| ‑‑outputCodec=CODEC  |  指定用于所复制文件的压缩编解码器。取值可以是：`gzip`、`gz`、`lzo`、`snappy` 或 `none`。您可以使用此选项，例如，把 Gzip 压缩的输入文件转换为 LZO 压缩的输出文件，或对文件进行解压缩，作为复制操作的一部分。如果选择输出编解码器，则系统会在文件名末尾追加适当的扩展名 (例如，对于 `gz` 和 `gzip`，将追加扩展名 `.gz`)。如果不为 `‑‑outputCodec` 指定值，则复制的文件不会出现压缩方面的变化。 示例：`‑‑outputCodec=lzo`  | 否  | 
| ‑‑s3ServerSideEncryption  |  确保使用 SSL 传输目标数据，并使用 AWS 服务端密钥在 Amazon S3 中自动加密。使用 S3 检索数据时DistCp，对象会自动解密。如果尝试将未加密的对象复制到需要加密的 Amazon S3 存储桶，则操作将失败。有关更多信息，请参阅[使用数据加密](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html)。 示例：`‑‑s3ServerSideEncryption`  | 否  | 
| ‑‑deleteOnSuccess  |  如果复制操作成功，则此选项会导致 S3 DistCp 从源位置删除复制的文件。如果您以计划任务的形式将输出文件 (如日志文件) 从一个位置复制到另一个位置，又不想复制两次相同的文件，那么这个选项会非常有用。 示例：`‑‑deleteOnSuccess`  | 否  | 
| ‑‑disableMultipartUpload  |  禁用分段上载。 示例：`‑‑disableMultipartUpload`  | 否  | 
| ‑‑multipartUploadChunkSize=SIZE  |  Amazon S3 分段上传中每个分段的大小，以 MiB 为单位。当 S3 复制的数据大于时，它会DistCp 使用分段上传。`multipartUploadChunkSize`要提高作业性能，可以增加每个分段的大小。默认大小为 128 MiB。 示例：`‑‑multipartUploadChunkSize=1000`  | 否  | 
| ‑‑numberFiles  |  在输出文件之前加上序号。计数从 0 开始，除非 `‑‑startingIndex` 指定一个不同的值。 示例：`‑‑numberFiles`  | 否  | 
| ‑‑startingIndex=INDEX  |  使用 `‑‑numberFiles` 指定序列中的第一个数字。 示例：`‑‑startingIndex=1`  | 否  | 
| ‑‑outputManifest=FILENAME  |  创建使用 Gzip 压缩的文本文件，其中包含 S3 DistCp 复制的所有文件的列表。 示例：`‑‑outputManifest=manifest-1.gz`  | 否  | 
| ‑‑previousManifest=PATH  |  DistCp 使用`‑‑outputManifest`标志读取上次调用 S3 期间创建的清单文件。设置该`‑‑previousManifest`标志后，S3 会将清单中列出的文件DistCp 排除在复制操作之外。如果同时指定 `‑‑outputManifest` 和 `‑‑previousManifest`，则之前清单中列出的文件也会出现在新的清单文件中，但不会复制这些文件。 示例：`‑‑previousManifest=/usr/bin/manifest-1.gz`  | 否  | 
| ‑‑requirePreviousManifest |  需要先前在调用 S3 时创建的清单DistCp。如果将它设置为 false，则不指定之前的清单也不会生成错误。默认值为 true。  | 否  | 
| ‑‑copyFromManifest  |  反转的行为`‑‑previousManifest`，使 S3 DistCp 使用指定的清单文件作为要复制的文件列表，而不是要从复制中排除的文件列表。 示例：`‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`  | 否  | 
| ‑‑s3Endpoint=ENDPOINT |  指定上载文件时要使用的 Amazon S3 终端节点。此选项会同时设置源位置和目标位置的终端节点。如果未设置，则默认终端节点是 `s3.amazonaws.com`。有关 Amazon S3 终端节点的列表，请参阅[区域和终端节点](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region)。 示例：`‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`  | 否  | 
| ‑‑storageClass=CLASS |  目标为 Amazon S3 时要使用的存储类。有效值是 STANDARD 和 REDUCED\$1REDUNDANCY。如果未指定此选项，S3 将DistCp 尝试保留存储类别。 示例：`‑‑storageClass=STANDARD`  | 否  | 
| ‑‑srcPrefixesFile=PATH |  Amazon S3（s3://）、HDFS（hdfs:///）或本地文件系统（file:/）中包含 `src` 前缀的文本文件（每行一个前缀）。 如果提供`srcPrefixesFile`，S3 DistCp 将不会列出 src 路径。相反，它将生成一个源列表，以作为列出在此文件中指定的所有前缀的组合结果。与 src 相比较的相对路径 (而不是这些前缀) 将用于生成目标路径。如果还指定了 `srcPattern`，则会将它应用于源前缀的组合列表结果以进一步筛选输入。如果使用了 `copyFromManifest`，则会复制清单中的对象并忽略 `srcPrefixesFile`。 示例：`‑‑srcPrefixesFile=PATH`  | 否  | 

除了上述选项外，S3 还DistCp 实现了[工具接口](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html)，这意味着它支持通用选项。

## 将 S3 DistCp 作为集群中的步骤添加
<a name="UsingEMR_s3distcp.step"></a>

您可以DistCp 通过将 S3 作为步骤添加到集群中来调用 S3。可以使用控制台、CLI 或 API 在启动时向集群或是向正在运行的集群添加步骤。以下示例演示如何向正在运行的集群添加 S3 DistCp 步骤。有关向集群添加步骤的更多信息，请参阅《Amazon EMR 管理指南》**中的[向集群提交工作](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html)。

**要向正在运行的集群添加 S3 DistCp 步骤，请使用 AWS CLI**

有关在中使用 Amazon EMR 命令的更多信息 AWS CLI，请参阅[AWS CLI 命令](https://docs.aws.amazon.com/cli/latest/reference/emr)参考。
+ 要向调用 S3 的集群添加步骤DistCp，请将指定 S3 DistCp 应如何执行复制操作的参数作为参数传递。

  以下示例将守护进程日志从 Amazon S3 复制到 `hdfs:///output`。在以下命令中：
  + `‑‑cluster-id` 指定集群
  + `Jar`是 S3 DistCp JAR 文件的位置。如需有关如何使用 command-runner.jar 在群集上运行命令的示例，请参阅[提交自定义 JAR 步骤以运行脚本或命令](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples)。
  + `Args`是要传入 S3 的选项名称/值对的逗号分隔列表。DistCp有关可用选项的完整列表，请参阅[S3 DistCp 选项](#UsingEMR_s3distcp.options)。

  要向正在运行的集群添加 S3 DistCp 复制步骤，请将以下内容放入保存在 Amazon S3 或本地文件系统中的 JSON 文件中，如`myStep.json`本示例所示。*j-3GYXXXXXX9IOK*替换为您的集群 ID，然后*amzn-s3-demo-bucket*替换为您的 Amazon S3 存储桶名称。

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example 从 Amazon S3 向 HDFS 复制日志文件**  
此示例还说明如何通过向正在运行的集群添加步骤来将 Amazon S3 存储桶中存储的日志文件复制到 HDFS 中。在此示例中，`‑‑srcPattern` 选项用于限制复制到守护程序日志的数据。  
要使用 `‑‑srcPattern` 选项将日志文件从 Amazon S3 复制到 HDFS，请在某个 JSON 文件（本例中为 `myStep.json`）中输入以下内容并保存到 Amazon S3 或本地文件系统中。*j-3GYXXXXXX9IOK*替换为您的集群 ID，然后*amzn-s3-demo-bucket*替换为您的 Amazon S3 存储桶名称。  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## 在 S3 DistCp 任务失败后进行清理
<a name="s3distcp-cleanup"></a>

如果 S3 DistCp 无法复制部分或全部指定文件，则命令或集群步骤将失败并返回非零错误代码。如果发生这种情况，S3 DistCp 不会清理部分复制的文件。您必须手动删除它们。

部分复制的文件以 S3 DistCp 作业的唯一标识符保存到 HDFS `tmp` 目录中的子目录中。您可以在作业的标准输出中找到此 ID。

例如，对于 ID 为 S3 的DistCp 作业`4b1c37bb-91af-4391-aaf8-46a6067085a6`，您可以连接到集群的主节点并运行以下命令来查看与该任务关联的输出文件。

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

该命令将返回与以下类似的文件列表：

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

然后，您可以运行以下命令来删除目录和所有内容。

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```