

# Connectors and utilities
<a name="emr-connectors"></a>

Amazon EMR provides a number of connectors and utilities to access other AWS services as data sources. You can usually access data in these services within a program. For example, you can specify an Kinesis stream in a Hive query, Pig script, or MapReduce application and then operate on that data.

**Topics**
+ [Export, import, query, and join tables in DynamoDB using Amazon EMR](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [S3DistCp (s3-dist-cp)](UsingEMR_s3distcp.md)
+ [Cleaning up after failed S3DistCp jobs](#s3distcp-cleanup)

# Export, import, query, and join tables in DynamoDB using Amazon EMR
<a name="EMRforDynamoDB"></a>

**Note**  
The Amazon EMR-DynamoDB Connector is open-sourced on GitHub. For more information, see [https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector).

DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability. Developers can create a database table and grow its request traffic or storage without limit. DynamoDB automatically spreads the data and traffic for the table over a sufficient number of servers to handle the request capacity specified by the customer and the amount of data stored, while maintaining consistent, fast performance. Using Amazon EMR and Hive you can quickly and efficiently process large amounts of data, such as data stored in DynamoDB. For more information about DynamoDB, see [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/).

Apache Hive is a software layer that you can use to query map reduce clusters using a simplified, SQL-like query language called HiveQL. It runs on top of the Hadoop architecture. For more information about Hive and HiveQL, go to the [HiveQL language manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual). For more information about Hive and Amazon EMR, see [Apache Hive](emr-hive.md) .

You can use Amazon EMR with a customized version of Hive that includes connectivity to DynamoDB to perform operations on data stored in DynamoDB:
+ Loading DynamoDB data into the Hadoop Distributed File System (HDFS) and using it as input into an Amazon EMR cluster.
+ Querying live DynamoDB data using SQL-like statements (HiveQL).
+ Joining data stored in DynamoDB and exporting it or querying against the joined data.
+ Exporting data stored in DynamoDB to Amazon S3.
+ Importing data stored in Amazon S3 to DynamoDB.

**Note**  
The Amazon EMR-DynamoDB Connector does not support clusters configured to use [Kerberos authentication](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html).

To perform each of the following tasks, you'll launch an Amazon EMR cluster, specify the location of the data in DynamoDB, and issue Hive commands to manipulate the data in DynamoDB. 

There are several ways to launch an Amazon EMR cluster: you can use the Amazon EMR console, the command line interface (CLI), or you can program your cluster using an AWS SDK or the Amazon EMR API. You can also choose whether to run a Hive cluster interactively or from a script. In this section, we will show you how to launch an interactive Hive cluster from the Amazon EMR console and the CLI. 

Using Hive interactively is a great way to test query performance and tune your application. After you have established a set of Hive commands that will run on a regular basis, consider creating a Hive script that Amazon EMR can run for you. 

**Warning**  
Amazon EMR read or write operations on an DynamoDB table count against your established provisioned throughput, potentially increasing the frequency of provisioned throughput exceptions. For large requests, Amazon EMR implements retries with exponential backoff to manage the request load on the DynamoDB table. Running Amazon EMR jobs concurrently with other traffic may cause you to exceed the allocated provisioned throughput level. You can monitor this by checking the **ThrottleRequests** metric in Amazon CloudWatch. If the request load is too high, you can relaunch the cluster and set the [Read percent setting](EMR_Hive_Optimizing.md#ReadPercent) or [Write percent setting](EMR_Hive_Optimizing.md#WritePercent) to a lower value to throttle the Amazon EMR operations. For information about DynamoDB throughput settings, see [Provisioned throughput](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput).   
If a table is configured for [On-Demand mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand), you should change the table back to provisioned mode before running an export or import operation. Pipelines need a throughput ratio in order to calculate resources to use from a DynamoDBtable. On-demand mode removes provisioned throughput. To provision throughput capacity, you can use Amazon CloudWatch Events metrics to evaluate the aggregate throughput that a table has used.

**Topics**
+ [Set up a Hive table to run Hive commands](EMR_Interactive_Hive.md)
+ [Hive command examples for exporting, importing, and querying data in DynamoDB](EMR_Hive_Commands.md)
+ [Optimizing performance for Amazon EMR operations in DynamoDB](EMR_Hive_Optimizing.md)

# Set up a Hive table to run Hive commands
<a name="EMR_Interactive_Hive"></a>

Apache Hive is a data warehouse application you can use to query data contained in Amazon EMR clusters using a SQL-like language. For more information about Hive, see [http://hive.apache.org/](http://hive.apache.org/).

The following procedure assumes you have already created a cluster and specified an Amazon EC2 key pair. To learn how to get started creating clusters, see [Getting started with Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs) in the *Amazon EMR Management Guide*.

## Configure Hive to use MapReduce
<a name="hive-mapreduce"></a>

When you use Hive on Amazon EMR to query DynamoDB tables, errors can occur if Hive uses the default execution engine, Tez. For this reason, when you create a cluster with Hive that integrates with DynamoDB as described in this section, we recommend that you use a configuration classification that sets Hive to use MapReduce. For more information, see [Configure applications](emr-configure-apps.md).

The following snippet shows the configuration classification and property to use to set MapReduce as the execution engine for Hive:

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**To run Hive commands interactively**

1. Connect to the master node. For more information, see [Connect to the master node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. At the command prompt for the current master node, type `hive`.

   You should see a hive prompt: `hive>`

1.  Enter a Hive command that maps a table in the Hive application to the data in DynamoDB. This table acts as a reference to the data stored in Amazon DynamoDB; the data is not stored locally in Hive and any queries using this table run against the live data in DynamoDB, consuming the table's read or write capacity every time a command is run. If you expect to run multiple Hive commands against the same dataset, consider exporting it first. 

    The following shows the syntax for mapping a Hive table to a DynamoDB table. 

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    When you create a table in Hive from DynamoDB, you must create it as an external table using the keyword `EXTERNAL`. The difference between external and internal tables is that the data in internal tables is deleted when an internal table is dropped. This is not the desired behavior when connected to Amazon DynamoDB, and thus only external tables are supported. 

    For example, the following Hive command creates a table named *hivetable1* in Hive that references the DynamoDB table named *dynamodbtable1*. The DynamoDB table *dynamodbtable1* has a hash-and-range primary key schema. The hash key element is `name` (string type), the range key element is `year` (numeric type), and each item has an attribute value for `holidays` (string set type). 

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    Line 1 uses the HiveQL `CREATE EXTERNAL TABLE` statement. For *hivetable1*, you need to establish a column for each attribute name-value pair in the DynamoDB table, and provide the data type. These values are not case-sensitive, and you can give the columns any name (except reserved words). 

    Line 2 uses the `STORED BY` statement. The value of `STORED BY` is the name of the class that handles the connection between Hive and DynamoDB. It should be set to `'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'`. 

    Line 3 uses the `TBLPROPERTIES` statement to associate "hivetable1" with the correct table and schema in DynamoDB. Provide `TBLPROPERTIES` with values for the `dynamodb.table.name` parameter and `dynamodb.column.mapping` parameter. These values *are* case-sensitive.
**Note**  
 All DynamoDB attribute names for the table must have corresponding columns in the Hive table. Depending on your Amazon EMR version, the following scenarios occur if the one-to-one mapping does not exist:  
On Amazon EMR version 5.27.0 and later, the connector has validations that ensure a one-to-one mapping between DynamoDB attribute names and columns in the Hive table. An error will occur if the one-to-one mapping does not exist.
On Amazon EMR version 5.26.0 and earlier, the Hive table won't contain the name-value pair from DynamoDB. If you do not map the DynamoDB primary key attributes, Hive generates an error. If you do not map a non-primary key attribute, no error is generated, but you won't see the data in the Hive table. If the data types do not match, the value is null. 

Then you can start running Hive operations on *hivetable1*. Queries run against *hivetable1* are internally run against the DynamoDB table *dynamodbtable1* of your DynamoDB account, consuming read or write units with each execution.

When you run Hive queries against a DynamoDB table, you need to ensure that you have provisioned a sufficient amount of read capacity units.

For example, suppose that you have provisioned 100 units of read capacity for your DynamoDB table. This will let you perform 100 reads, or 409,600 bytes, per second. If that table contains 20GB of data (21,474,836,480 bytes), and your Hive query performs a full table scan, you can estimate how long the query will take to run:

 * 21,474,836,480 / 409,600 = 52,429 seconds = 14.56 hours * 

The only way to decrease the time required would be to adjust the read capacity units on the source DynamoDB table. Adding more Amazon EMR nodes will not help.

In the Hive output, the completion percentage is updated when one or more mapper processes are finished. For a large DynamoDB table with a low provisioned read capacity setting, the completion percentage output might not be updated for a long time; in the case above, the job will appear to be 0% complete for several hours. For more detailed status on your job's progress, go to the Amazon EMR console; you will be able to view the individual mapper task status, and statistics for data reads. You can also log on to Hadoop interface on the master node and see the Hadoop statistics. This will show you the individual map task status and some data read statistics. For more information, see the following topics:
+ [Web interfaces hosted on the master node](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [View the Hadoop web interfaces](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

For more information about sample HiveQL statements to perform tasks such as exporting or importing data from DynamoDB and joining tables, see [Hive command examples for exporting, importing, and querying data in DynamoDB](EMR_Hive_Commands.md).<a name="EMR_Hive_Cancel"></a>

**To cancel a Hive request**

When you execute a Hive query, the initial response from the server includes the command to cancel the request. To cancel the request at any time in the process, use the **Kill Command** from the server response.

1. Enter `Ctrl+C` to exit the command line client.

1.  At the shell prompt, enter the **Kill Command** from the initial server response to your request. 

    Alternatively, you can run the following command from the command line of the master node to kill the Hadoop job, where *job-id* is the identifier of the Hadoop job and can be retrieved from the Hadoop user interface.

   ```
   hadoop job -kill job-id
   ```

## Data types for Hive and DynamoDB
<a name="EMR_Hive_Properties"></a>

The following table shows the available Hive data types, the default DynamoDB type that they correspond to, and the alternate DynamoDB types that they can also map to. 


| Hive type | Default DynamoDB type | Alternate DynamoDB type(s) | 
| --- | --- | --- | 
| string | string (S) |  | 
| bigint or double | number (N) |  | 
| binary | binary (B) |  | 
| boolean | boolean (BOOL) |  | 
| array | list (L) | number set (NS), string set (SS), or binary set (BS) | 
| map<string,string> | item | map (M) | 
| map<string,?> | map (M) |  | 
|  | null (NULL) |  | 

If you want to write your Hive data as a corresponding alternate DynamoDB type, or if your DynamoDB data contains attribute values of an alternate DynamoDB type, you can specify the column and the DynamoDB type with the `dynamodb.type.mapping` parameter. The following example shows the syntax for specifying an alternate type mapping.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

The type mapping parameter is optional, and only has to be specified for the columns that use alternate types.

For example, the following Hive command creates a table named `hivetable2` that references the DynamoDB table `dynamodbtable2`. It is similar to `hivetable1`, except that it maps the `col3` column to the string set (SS) type. 

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

In Hive, `hivetable1` and `hivetable2` are identical. However, when data from those tables are written to their corresponding DynamoDB tables, `dynamodbtable1` will contain lists, while `dynamodbtable2` will contain string sets.

If you want to write Hive `null` values as attributes of DynamoDB `null` type, you can do so with the `dynamodb.null.serialization` parameter. The following example shows the syntax for specifying `null` serialization.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

The null serialization parameter is optional, and is set to `false` if not specified. Note that DynamoDB `null` attributes are read as `null` values in Hive regardless of the parameter setting. Hive collections with `null` values can be written to DynamoDB only if the null serialization parameter is specified as `true`. Otherwise, a Hive error occurs.

The bigint type in Hive is the same as the Java long type, and the Hive double type is the same as the Java double type in terms of precision. This means that if you have numeric data stored in DynamoDB that has precision higher than is available in the Hive datatypes, using Hive to export, import, or reference the DynamoDB data could lead to a loss in precision or a failure of the Hive query. 

 Exports of the binary type from DynamoDB to Amazon Simple Storage Service (Amazon S3) or HDFS are stored as a Base64-encoded string. If you are importing data from Amazon S3 or HDFS into the DynamoDB binary type, it should be encoded as a Base64 string. 

## Hive options
<a name="EMR_Hive_Options"></a>

 You can set the following Hive options to manage the transfer of data out of Amazon DynamoDB. These options only persist for the current Hive session. If you close the Hive command prompt and reopen it later on the cluster, these settings will have returned to the default values. 


| Hive options | Description | 
| --- | --- | 
| dynamodb.throughput.read.percent |   Set the rate of read operations to keep your DynamoDB provisioned throughput rate in the allocated range for your table. The value is between `0.1` and `1.5`, inclusively.   The value of 0.5 is the default read rate, which means that Hive will attempt to consume half of the read provisioned throughout resources in the table. Increasing this value above 0.5 increases the read request rate. Decreasing it below 0.5 decreases the read request rate. This read rate is approximate. The actual read rate will depend on factors such as whether there is a uniform distribution of keys in DynamoDB.   If you find your provisioned throughput is frequently exceeded by the Hive operation, or if live read traffic is being throttled too much, then reduce this value below `0.5`. If you have enough capacity and want a faster Hive operation, set this value above `0.5`. You can also oversubscribe by setting it up to 1.5 if you believe there are unused input/output operations available.   | 
| dynamodb.throughput.write.percent |   Set the rate of write operations to keep your DynamoDB provisioned throughput rate in the allocated range for your table. The value is between `0.1` and `1.5`, inclusively.   The value of 0.5 is the default write rate, which means that Hive will attempt to consume half of the write provisioned throughout resources in the table. Increasing this value above 0.5 increases the write request rate. Decreasing it below 0.5 decreases the write request rate. This write rate is approximate. The actual write rate will depend on factors such as whether there is a uniform distribution of keys in DynamoDB   If you find your provisioned throughput is frequently exceeded by the Hive operation, or if live write traffic is being throttled too much, then reduce this value below `0.5`. If you have enough capacity and want a faster Hive operation, set this value above `0.5`. You can also oversubscribe by setting it up to 1.5 if you believe there are unused input/output operations available or this is the initial data upload to the table and there is no live traffic yet.   | 
| dynamodb.endpoint | Specify the endpoint for the DynamoDB service. For more information about the available DynamoDB endpoints, see [Regions and endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region).  | 
| dynamodb.max.map.tasks |   Specify the maximum number of map tasks when reading data from DynamoDB. This value must be equal to or greater than 1.   | 
| dynamodb.retry.duration |   Specify the number of minutes to use as the timeout duration for retrying Hive commands. This value must be an integer equal to or greater than 0. The default timeout duration is two minutes.   | 

 These options are set using the `SET` command as shown in the following example. 

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# Hive command examples for exporting, importing, and querying data in DynamoDB
<a name="EMR_Hive_Commands"></a>

The following examples use Hive commands to perform operations such as exporting data to Amazon S3 or HDFS, importing data to DynamoDB, joining tables, querying tables, and more. 

Operations on a Hive table reference data stored in DynamoDB. Hive commands are subject to the DynamoDB table's provisioned throughput settings, and the data retrieved includes the data written to the DynamoDB table at the time the Hive operation request is processed by DynamoDB. If the data retrieval process takes a long time, some data returned by the Hive command may have been updated in DynamoDB since the Hive command began. 

Hive commands `DROP TABLE` and `CREATE TABLE` only act on the local tables in Hive and do not create or drop tables in DynamoDB. If your Hive query references a table in DynamoDB, that table must already exist before you run the query. For more information about creating and deleting tables in DynamoDB, see [Working with tables in DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html) in the *Amazon DynamoDB Developer Guide*. 

**Note**  
 When you map a Hive table to a location in Amazon S3, do not map it to the root path of the bucket, s3://amzn-s3-demo-bucket, as this may cause errors when Hive writes the data to Amazon S3. Instead map the table to a subpath of the bucket, s3://amzn-s3-demo-bucket/mypath. 

## Exporting data from DynamoDB
<a name="EMR_Hive_Commands_exporting"></a>

 You can use Hive to export data from DynamoDB. 

**To export a DynamoDB table to an Amazon S3 bucket**
+  Create a Hive table that references data stored in DynamoDB. Then you can call the INSERT OVERWRITE command to write the data to an external directory. In the following example, *s3://amzn-s3-demo-bucket/path/subpath/* is a valid path in Amazon S3. Adjust the columns and datatypes in the CREATE command to match the values in your DynamoDB. You can use this to create an archive of your DynamoDB data in Amazon S3. 

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**To export a DynamoDB table to an Amazon S3 bucket using formatting**
+  Create an external table that references a location in Amazon S3. This is shown below as s3\$1export. During the CREATE call, specify row formatting for the table. Then, when you use INSERT OVERWRITE to export data from DynamoDB to s3\$1export, the data is written out in the specified format. In the following example, the data is written out as comma-separated values (CSV). 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**To export a DynamoDB table to an Amazon S3 bucket without specifying a column mapping**
+  Create a Hive table that references data stored in DynamoDB. This is similar to the preceding example, except that you are not specifying a column mapping. The table must have exactly one column of type `map<string, string>`. If you then create an `EXTERNAL` table in Amazon S3 you can call the `INSERT OVERWRITE` command to write the data from DynamoDB to Amazon S3. You can use this to create an archive of your DynamoDB data in Amazon S3. Because there is no column mapping, you cannot query tables that are exported this way. Exporting data without specifying a column mapping is available in Hive 0.8.1.5 or later, which is supported on Amazon EMR AMI 2.2.*x* and later. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**To export a DynamoDB table to an Amazon S3 bucket using data compression**
+  Hive provides several compression codecs you can set during your Hive session. Doing so causes the exported data to be compressed in the specified format. The following example compresses the exported files using the Lempel-Ziv-Oberhumer (LZO) algorithm. 

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   The available compression codecs are: 
  +  org.apache.hadoop.io.compress.GzipCodec 
  +  org.apache.hadoop.io.compress.DefaultCodec 
  +  com.hadoop.compression.lzo.LzoCodec 
  +  com.hadoop.compression.lzo.LzopCodec 
  +  org.apache.hadoop.io.compress.BZip2Codec 
  +  org.apache.hadoop.io.compress.SnappyCodec 

**To export a DynamoDB table to HDFS**
+  Use the following Hive command, where *hdfs:///directoryName* is a valid HDFS path and *hiveTableName* is a table in Hive that references DynamoDB. This export operation is faster than exporting a DynamoDB table to Amazon S3 because Hive 0.7.1.1 uses HDFS as an intermediate step when exporting data to Amazon S3. The following example also shows how to set `dynamodb.throughput.read.percent` to 1.0 in order to increase the read request rate. 

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   You can also export data to HDFS using formatting and compression as shown above for the export to Amazon S3. To do so, simply replace the Amazon S3 directory in the examples above with an HDFS directory. <a name="EMR_Hive_non-printable-utf8"></a>

**To read non-printable UTF-8 character data in Hive**
+ You can read and write non-printable UTF-8 character data with Hive by using the `STORED AS SEQUENCEFILE` clause when you create the table. A SequenceFile is Hadoop binary file format; you need to use Hadoop to read this file. The following example shows how to export data from DynamoDB into Amazon S3. You can use this functionality to handle non-printable UTF-8 encoded characters. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## Importing data to DynamoDB
<a name="EMR_Hive_Commands_importing"></a>

 When you write data to DynamoDB using Hive you should ensure that the number of write capacity units is greater than the number of mappers in the cluster. For example, clusters that run on m1.xlarge EC2 instances produce 8 mappers per instance. In the case of a cluster that has 10 instances, that would mean a total of 80 mappers. If your write capacity units are not greater than the number of mappers in the cluster, the Hive write operation may consume all of the write throughput, or attempt to consume more throughput than is provisioned. For more information about the number of mappers produced by each EC2 instance type, see [Configure Hadoop](emr-hadoop-config.md).

 The number of mappers in Hadoop are controlled by the input splits. If there are too few splits, your write command might not be able to consume all the write throughput available. 

 If an item with the same key exists in the target DynamoDB table, it is overwritten. If no item with the key exists in the target DynamoDB table, the item is inserted. 

**To import a table from Amazon S3 to DynamoDB**
+  You can use Amazon EMR (Amazon EMR) and Hive to write data from Amazon S3 to DynamoDB. 

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**To import a table from an Amazon S3 bucket to DynamoDB without specifying a column mapping**
+  Create an `EXTERNAL` table that references data stored in Amazon S3 that was previously exported from DynamoDB. Before importing, ensure that the table exists in DynamoDB and that it has the same key schema as the previously exported DynamoDB table. In addition, the table must have exactly one column of type `map<string, string>`. If you then create a Hive table that is linked to DynamoDB, you can call the `INSERT OVERWRITE` command to write the data from Amazon S3 to DynamoDB. Because there is no column mapping, you cannot query tables that are imported this way. Importing data without specifying a column mapping is available in Hive 0.8.1.5 or later, which is supported on Amazon EMR AMI 2.2.3 and later. 

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**To import a table from HDFS to DynamoDB**
+  You can use Amazon EMR and Hive to write data from HDFS to DynamoDB. 

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## Querying data in DynamoDB
<a name="EMR_Hive_Commands_querying"></a>

 The following examples show the various ways you can use Amazon EMR to query data stored in DynamoDB. 

**To find the largest value for a mapped column (`max`)**
+  Use Hive commands like the following. In the first command, the CREATE statement creates a Hive table that references data stored in DynamoDB. The SELECT statement then uses that table to query data stored in DynamoDB. The following example finds the largest order placed by a given customer. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**To aggregate data using the `GROUP BY` clause**
+  You can use the `GROUP BY` clause to collect data across multiple records. This is often used with an aggregate function such as sum, count, min, or max. The following example returns a list of the largest orders from customers who have placed more than three orders. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**To join two DynamoDB tables**
+  The following example maps two Hive tables to data stored in DynamoDB. It then calls a join across those two tables. The join is computed on the cluster and returned. The join does not take place in DynamoDB. This example returns a list of customers and their purchases for customers that have placed more than two orders. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**To join two tables from different sources**
+  In the following example, Customer\$1S3 is a Hive table that loads a CSV file stored in Amazon S3 and hive\$1purchases is a table that references data in DynamoDB. The following example joins together customer data stored as a CSV file in Amazon S3 with order data stored in DynamoDB to return a set of data that represents orders placed by customers who have "Miller" in their name. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**Note**  
 In the preceding examples, the CREATE TABLE statements were included in each example for clarity and completeness. When running multiple queries or export operations against a given Hive table, you only need to create the table one time, at the beginning of the Hive session. 

# Optimizing performance for Amazon EMR operations in DynamoDB
<a name="EMR_Hive_Optimizing"></a>

 Amazon EMR operations on a DynamoDB table count as read operations, and are subject to the table's provisioned throughput settings. Amazon EMR implements its own logic to try to balance the load on your DynamoDB table to minimize the possibility of exceeding your provisioned throughput. At the end of each Hive query, Amazon EMR returns information about the cluster used to process the query, including how many times your provisioned throughput was exceeded. You can use this information, as well as CloudWatch metrics about your DynamoDB throughput, to better manage the load on your DynamoDB table in subsequent requests. 

 The following factors influence Hive query performance when working with DynamoDB tables. 

## Provisioned read capacity units
<a name="ProvisionedReadCapacityUnits"></a>

 When you run Hive queries against a DynamoDB table, you need to ensure that you have provisioned a sufficient amount of read capacity units. 

 For example, suppose that you have provisioned 100 units of Read Capacity for your DynamoDB table. This will let you perform 100 reads, or 409,600 bytes, per second. If that table contains 20GB of data (21,474,836,480 bytes), and your Hive query performs a full table scan, you can estimate how long the query will take to run: 

 * 21,474,836,480 / 409,600 = 52,429 seconds = 14.56 hours * 

 The only way to decrease the time required would be to adjust the read capacity units on the source DynamoDB table. Adding more nodes to the Amazon EMR cluster will not help. 

 In the Hive output, the completion percentage is updated when one or more mapper processes are finished. For a large DynamoDB table with a low provisioned Read Capacity setting, the completion percentage output might not be updated for a long time; in the case above, the job will appear to be 0% complete for several hours. For more detailed status on your job's progress, go to the Amazon EMR console; you will be able to view the individual mapper task status, and statistics for data reads. 

 You can also log on to Hadoop interface on the master node and see the Hadoop statistics. This shows you the individual map task status and some data read statistics. For more information, see [Web interfaces hosted on the master node](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html) in the *Amazon EMR Management Guide*.

## Read percent setting
<a name="ReadPercent"></a>

 By default, Amazon EMR manages the request load against your DynamoDB table according to your current provisioned throughput. However, when Amazon EMR returns information about your job that includes a high number of provisioned throughput exceeded responses, you can adjust the default read rate using the `dynamodb.throughput.read.percent` parameter when you set up the Hive table. For more information about setting the read percent parameter, see [Hive options](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Write percent setting
<a name="WritePercent"></a>

 By default, Amazon EMR manages the request load against your DynamoDB table according to your current provisioned throughput. However, when Amazon EMR returns information about your job that includes a high number of provisioned throughput exceeded responses, you can adjust the default write rate using the `dynamodb.throughput.write.percent` parameter when you set up the Hive table. For more information about setting the write percent parameter, see [Hive options](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Retry duration setting
<a name="emr-ddb-retry-duration"></a>

 By default, Amazon EMR re-runs a Hive query if it has not returned a result within two minutes, the default retry interval. You can adjust this interval by setting the `dynamodb.retry.duration` parameter when you run a Hive query. For more information about setting the write percent parameter, see [Hive options](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Number of map tasks
<a name="NumberMapTasks"></a>

 The mapper daemons that Hadoop launches to process your requests to export and query data stored in DynamoDB are capped at a maximum read rate of 1 MiB per second to limit the read capacity used. If you have additional provisioned throughput available on DynamoDB, you can improve the performance of Hive export and query operations by increasing the number of mapper daemons. To do this, you can either increase the number of EC2 instances in your cluster *or* increase the number of mapper daemons running on each EC2 instance. 

 You can increase the number of EC2 instances in a cluster by stopping the current cluster and re-launching it with a larger number of EC2 instances. You specify the number of EC2 instances in the **Configure EC2 Instances** dialog box if you're launching the cluster from the Amazon EMR console, or with the `‑‑num-instances` option if you're launching the cluster from the CLI. 

 The number of map tasks run on an instance depends on the EC2 instance type. For more information about the supported EC2 instance types and the number of mappers each one provides, see [Task configuration](emr-hadoop-task-config.md). There, you will find a "Task Configuration" section for each of the supported configurations. 

 Another way to increase the number of mapper daemons is to change the `mapreduce.tasktracker.map.tasks.maximum` configuration parameter of Hadoop to a higher value. This has the advantage of giving you more mappers without increasing either the number or the size of EC2 instances, which saves you money. A disadvantage is that setting this value too high can cause the EC2 instances in your cluster to run out of memory. To set `mapreduce.tasktracker.map.tasks.maximum`, launch the cluster and specify a value for `mapreduce.tasktracker.map.tasks.maximum` as a property of the mapred-site configuration classification. This is shown in the following example. For more information, see [Configure applications](emr-configure-apps.md).

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## Parallel data requests
<a name="ParallelDataRequests"></a>

 Multiple data requests, either from more than one user or more than one application to a single table may drain read provisioned throughput and slow performance. 

## Process duration
<a name="ProcessDuration"></a>

 Data consistency in DynamoDB depends on the order of read and write operations on each node. While a Hive query is in progress, another application might load new data into the DynamoDB table or modify or delete existing data. In this case, the results of the Hive query might not reflect changes made to the data while the query was running. 

## Avoid exceeding throughput
<a name="AvoidExceedingThroughput"></a>

 When running Hive queries against DynamoDB, take care not to exceed your provisioned throughput, because this will deplete capacity needed for your application's calls to `DynamoDB::Get`. To ensure that this is not occurring, you should regularly monitor the read volume and throttling on application calls to `DynamoDB::Get` by checking logs and monitoring metrics in Amazon CloudWatch. 

## Request time
<a name="RequestTime"></a>

 Scheduling Hive queries that access a DynamoDB table when there is lower demand on the DynamoDB table improves performance. For example, if most of your application's users live in San Francisco, you might choose to export daily data at 4 a.m. PST, when the majority of users are asleep, and not updating records in your DynamoDB database. 

## Time-based tables
<a name="TimeBasedTables"></a>

 If the data is organized as a series of time-based DynamoDB tables, such as one table per day, you can export the data when the table becomes no longer active. You can use this technique to back up data to Amazon S3 on an ongoing fashion. 

## Archived data
<a name="ArchivedData"></a>

 If you plan to run many Hive queries against the data stored in DynamoDB and your application can tolerate archived data, you may want to export the data to HDFS or Amazon S3 and run the Hive queries against a copy of the data instead of DynamoDB. This conserves your read operations and provisioned throughput. 

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR clusters can read and process Amazon Kinesis streams directly, using familiar tools in the Hadoop ecosystem such as Hive, Pig, MapReduce, the Hadoop Streaming API, and Cascading. You can also join real-time data from Amazon Kinesis with existing data on Amazon S3, Amazon DynamoDB, and HDFS in a running cluster. You can directly load the data from Amazon EMR to Amazon S3 or DynamoDB for post-processing activities. For information about Amazon Kinesis service highlights and pricing, see the [Amazon Kinesis](https://aws.amazon.com//kinesis) page.

## What can I do with Amazon EMR and Amazon Kinesis integration?
<a name="kinesis-use-cases"></a>

 Integration between Amazon EMR and Amazon Kinesis makes certain scenarios much easier; for example: 
+ **Streaming log analysis**–You can analyze streaming web logs to generate a list of top 10 error types every few minutes by region, browser, and access domain. 
+ **Customer engagement**–You can write queries that join clickstream data from Amazon Kinesis with advertising campaign information stored in a DynamoDB table to identify the most effective categories of ads that are displayed on particular websites. 
+ **Ad-hoc interactive queries**–You can periodically load data from Amazon Kinesis streams into HDFS and make it available as a local Impala table for fast, interactive, analytic queries.

## Checkpointed analysis of Amazon Kinesis streams
<a name="kinesis-checkpoint"></a>

Users can run periodic, batched analysis of Amazon Kinesis streams in what are called *iterations*. Because Amazon Kinesis stream data records are retrieved by using a sequence number, iteration boundaries are defined by starting and ending sequence numbers that Amazon EMR stores in a DynamoDB table. For example, when `iteration0` ends, it stores the ending sequence number in DynamoDB so that when the `iteration1` job begins, it can retrieve subsequent data from the stream. This mapping of iterations in stream data is called *checkpointing*. For more information, see [Kinesis connector](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector).

If an iteration was checkpointed and the job failed processing an iteration, Amazon EMR attempts to reprocess the records in that iteration. 

Checkpointing is a feature that allows you to: 
+ Start data processing after a sequence number processed by a previous query that ran on same stream and logical name
+ Re-process the same batch of data from Kinesis that was processed by an earlier query

 To enable checkpointing, set the `kinesis.checkpoint.enabled` parameter to `true` in your scripts. Also, configure the following parameters:


| Configuration setting | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | DynamoDB table name where checkpoint information will be stored | 
| kinesis.checkpoint.metastore.hash.key.name | Hash key name for the DynamoDB table | 
| kinesis.checkpoint.metastore.hash.range.name | Range key name for the DynamoDB table | 
| kinesis.checkpoint.logical.name | A logical name for current processing | 
| kinesis.checkpoint.iteration.no | Iteration number for processing associated with the logical name | 
| kinesis.rerun.iteration.without.wait | Boolean value that indicates if a failed iteration can be rerun without waiting for timeout; the default is false | 

### Provisioned IOPS recommendations for Amazon DynamoDB tables
<a name="kinesis-checkpoint-DDB"></a>

The Amazon EMR connector for Amazon Kinesis uses the DynamoDB database as its backing for checkpointing metadata. You must create a table in DynamoDB before consuming data in an Amazon Kinesis stream with an Amazon EMR cluster in checkpointed intervals. The table must be in the same region as your Amazon EMR cluster. The following are general recommendations for the number of IOPS you should provision for your DynamoDB tables; let `j` be the maximum number of Hadoop jobs (with different logical name\$1iteration number combination) that can run concurrently and `s` be the maximum number of shards that any job will process:

For **Read Capacity Units**: `j`\$1`s`/`5`

For **Write Capacity Units**: `j`\$1`s`

## Performance considerations
<a name="performance"></a>

Amazon Kinesis shard throughput is directly proportional to the instance size of nodes in Amazon EMR clusters and record size in the stream. We recommend that you use m5.xlarge or larger instances on master and core nodes.

## Schedule Amazon Kinesis analysis with Amazon EMR
<a name="schedule"></a>

When you are analyzing data on an active Amazon Kinesis stream, limited by timeouts and a maximum duration for any iteration, it is important that you run the analysis frequently to gather periodic details from the stream. There are multiple ways to execute such scripts and queries at periodic intervals; we recommend using AWS Data Pipeline for recurrent tasks like these. For more information, see [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html) and [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html) in the *AWS Data Pipeline Developer Guide*.

# Migrating Spark Kinesis connector to SDK 2.x for Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

The AWS SDK provides a rich set of APIs and libraries to interact with AWS cloud computing services, such as managing credentials, connecting to S3 and Kinesis services. The Spark Kinesis connector is used to consume data from Kinesis Data Streams, and the received data is transformed and processed in Spark’s execution engine. Currently this connector is built on top of 1.x of AWS SDK and Kinesis-client-library (KCL). 

As part of the AWS SDK 2.x migration, the Spark Kinesis connector is also updated accordingly to run with the SDK 2.x. In the Amazon EMR 7.0 release, Spark contains the SDK 2.x upgrade that is not yet available in the community version of Apache Spark. If you use the Spark Kinesis connector from a release that's lower than 7.0, you must migrate your application codes to run on SDK 2.x before you can migrate to Amazon EMR 7.0.

## Migration guides
<a name="migrating-spark-kinesis-migration-guides"></a>

This section describes the steps to migrate an application to the upgraded Spark Kinesis connector. It includes guides to migrate to the Kinesis Client Library (KCL) 2.x, AWS credential providers, and AWS service clients in AWS SDK 2.x. For reference, it also includes a sample [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) program that uses the Kinesis connector.

**Topics**
+ [Migrating KCL from 1.x to 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Migrating AWS credentials providers from AWS SDK 1.x to 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migrating AWS service clients from AWS SDK 1.x to 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Code examples for streaming applications](#migrating-spark-kinesis-streaming-examples)
+ [Considerations when using the upgraded Spark Kinesis connector](#migrating-spark-kinesis-considerations)

### Migrating KCL from 1.x to 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Metrics level and dimensions in `KinesisInputDStream`**

  When you instantiate a `KinesisInputDStream`, you can control the metrics level and dimensions for the stream. The following example demonstrates how you might customize these parameters with KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  In KCL 2.x, these config settings have different package names. To migrate to 2.x:

  1. Change the import statements for `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` and `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` to `software.amazon.kinesis.metrics.MetricsLevel` and `software.amazon.kinesis.metrics.MetricsUtil` respectively.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Replace the line `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` with `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  Following is an updated version of the `KinesisInputDStream` with customized metrics level and metrics dimensions:

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Message handler function in `KinesisInputDStream`

  When instantiating a `KinesisInputDStream`, you may also provide a “message handler function” that takes a Kinesis Record and returns a generic object T, in case you would like to use other data included in a Record such as partition key.

  In KCL 1.x, the message handler function signature is: `Record => T`, where Record is `com.amazonaws.services.kinesis.model.Record`. In KCL 2.x, the handler’s signature is changed to: `KinesisClientRecord => T`, where KinesisClientRecord is `software.amazon.kinesis.retrieval.KinesisClientRecord`. 

  Following is an example of providing a message handler in KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  To migrate the message handler:

  1. Change the import statement for `com.amazonaws.services.kinesis.model.Record` to `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Update the the method signature of the message handler.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Following is an updated example of providing the message handler in KCL 2.x:

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  For more information about migrating from KCL 1.x to 2.x, see [Migrating Consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html).

### Migrating AWS credentials providers from AWS SDK 1.x to 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Credentials providers are used to obtain AWS credentials for interactions with AWS. There are several interface and class changes related to the credentials providers in SDK 2.x, which can be found [here](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). Spark Kinesis connector has defined an interface (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) and implementation classes that returns 1.x version of AWS credential providers. These credentials providers are needed when initializing Kinesis clients. For instance, if you are using the method `SparkAWSCredentials.provider` in the applications, you would need to update codes to consume 2.x version of AWS credential providers.

Following is an example of using the credential providers in AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**To migrate to SDK 2.x:**

1. Change the import statement for `com.amazonaws.auth.AWSCredentialsProvider` to `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Update the remaining codes that use this class. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migrating AWS service clients from AWS SDK 1.x to 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS service clients have different package names in 2.x (i.e. `software.amazon.awssdk`). whereas the SDK 1.x uses `com.amazonaws`. For more information about the client changes, see [here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). If you are using these service clients in the codes, you would need to migrate the clients accordingly.

Following is an example of creating a client in SDK 1.x:

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**To migrate to 2.x:**

1. Change the import statements for service clients. Take DynamoDB clients as an example. You would need to change `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` or `com.amazonaws.services.dynamodbv2.document.DynamoDB` to `software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Update the codes that initialize the clients

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   For more information about migrating AWS SDK from 1.x to 2.x, see [What's different between the AWS SDK for Java 1.x and 2.x](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### Code examples for streaming applications
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Considerations when using the upgraded Spark Kinesis connector
<a name="migrating-spark-kinesis-considerations"></a>
+ If your applications uses the `Kinesis-producer-library` with JDK version lower than 11, you may run into exceptions like `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`. This happens because EMR 7.0 comes with JDK 17 by default and J2EE modules have been removed from the standard libraries since Java 11\$1. This could be fixed by adding the following dependency in the pom file. Replace the library version with one as you see fit.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ The Spark Kinesis connector jar can be found under this path after an EMR cluster is created: `/usr/lib/spark/connector/lib/`

# S3DistCp (s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

Apache DistCp is an open-source tool you can use to copy large amounts of data. *S3DistCp* is similar to DistCp, but optimized to work with AWS, particularly Amazon S3. The command for S3DistCp in Amazon EMR version 4.0 and later is `s3-dist-cp`, which you add as a step in a cluster or at the command line. Using S3DistCp, you can efficiently copy large amounts of data from Amazon S3 into HDFS where it can be processed by subsequent steps in your Amazon EMR cluster. You can also use S3DistCp to copy data between Amazon S3 buckets or from HDFS to Amazon S3. S3DistCp is more scalable and efficient for parallel copying large numbers of objects across buckets and across AWS accounts.

For specific commands that demonstrate the flexibility of S3DistCP in real-world scenarios, see [Seven tips for using S3DistCp](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/) on the AWS Big Data blog.

Like DistCp, S3DistCp uses MapReduce to copy in a distributed manner. It shares the copy, error handling, recovery, and reporting tasks across several servers. For more information about the Apache DistCp open source project, see the [DistCp guide](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html) in the Apache Hadoop documentation.

If S3DistCp is unable to copy some or all of the specified files, the cluster step fails and returns a non-zero error code. If this occurs, S3DistCp does not clean up partially copied files. 

**Important**  
S3DistCp does not support Amazon S3 bucket names that contain the underscore character.  
S3DistCp does not support concatenation for Parquet files. Use PySpark instead. For more information, see [Concatenating parquet files in Amazon EMR](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/).  
To avoid copy errors when using S3DistCP to copy a single file (instead of a directory) from S3 to HDFS, use Amazon EMR version 5.33.0 or later, or Amazon EMR version 6.3.0 or later.

## S3DistCp options
<a name="UsingEMR_s3distcp.options"></a>

Though similar to DistCp, S3DistCp supports a different set of options to change how it copies and compresses data.

When you call S3DistCp, you can specify the options described in the following table. The options are added to the step using the arguments list. Examples of the S3DistCp arguments are shown in the following table. 


| Option  | Description  | Required  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  Location of the data to copy. This can be either an HDFS or Amazon S3 location.  Example: `‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`   S3DistCp does not support Amazon S3 bucket names that contain the underscore character.   | Yes  | 
| ‑‑dest=LOCATION  |  Destination for the data. This can be either an HDFS or Amazon S3 location.  Example: `‑‑dest=hdfs:///output`   S3DistCp does not support Amazon S3 bucket names that contain the underscore character.   | Yes  | 
| ‑‑srcPattern=PATTERN  |  A [regular expression](http://en.wikipedia.org/wiki/Regular_expression) that filters the copy operation to a subset of the data at `‑‑src`. If neither `‑‑srcPattern` nor `‑‑groupBy` is specified, all data at `‑‑src` is copied to `‑‑dest`.  If the regular expression argument contains special characters, such as an asterisk (\$1), either the regular expression or the entire `‑‑args` string must be enclosed in single quotes (').  Example: `‑‑srcPattern=.*daemons.*-hadoop-.*`   | No  | 
| ‑‑groupBy=PATTERN  |  A [regular expression](http://en.wikipedia.org/wiki/Regular_expression) that causes S3DistCp to concatenate files that match the expression. For example, you could use this option to combine all of the log files written in one hour into a single file. The concatenated filename is the value matched by the regular expression for the grouping.  Parentheses indicate how files should be grouped, with all of the items that match the parenthetical statement being combined into a single output file. If the regular expression does not include a parenthetical statement, the cluster fails on the S3DistCp step and return an error.  If the regular expression argument contains special characters, such as an asterisk (\$1), either the regular expression or the entire `‑‑args` string must be enclosed in single quotes (').  When `‑‑groupBy` is specified, only files that match the specified pattern are copied. You do not need to specify `‑‑groupBy` and `‑‑srcPattern` at the same time.  Example: `‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | No  | 
| ‑‑targetSize=SIZE  |  The size, in mebibytes (MiB), of the files to create based on the `‑‑groupBy` option. This value must be an integer. When `‑‑targetSize` is set, S3DistCp attempts to match this size; the actual size of the copied files may be larger or smaller than this value. Jobs are aggregated based on the size of the data file, thus it is possible that the target file size will match the source data file size.  If the files concatenated by `‑‑groupBy` are larger than the value of `‑‑targetSize`, they are broken up into part files, and named sequentially with a numeric value appended to the end. For example, a file concatenated into `myfile.gz` would be broken into parts as: `myfile0.gz`, `myfile1.gz`, etc.  Example: `‑‑targetSize=2`   | No  | 
| ‑‑appendToLastFile |  Specifies the behavior of S3DistCp when copying to files from Amazon S3 to HDFS which are already present. It appends new file data to existing files. If you use `‑‑appendToLastFile` with `‑‑groupBy`, new data is appended to files which match the same groups. This option also respects the `‑‑targetSize` behavior when used with `‑‑groupBy.`  | No  | 
| ‑‑outputCodec=CODEC  |  Specifies the compression codec to use for the copied files. This can take the values: `gzip`, `gz`, `lzo`, `snappy`, or `none`. You can use this option, for example, to convert input files compressed with Gzip into output files with LZO compression, or to uncompress the files as part of the copy operation. If you choose an output codec, the filename will be appended with the appropriate extension (e.g. for `gz` and `gzip`, the extension is `.gz`) If you do not specify a value for `‑‑outputCodec`, the files are copied over with no change in their compression.  Example: `‑‑outputCodec=lzo`   | No  | 
| ‑‑s3ServerSideEncryption  |  Ensures that the target data is transferred using SSL and automatically encrypted in Amazon S3 using an AWS service-side key. When retrieving data using S3DistCp, the objects are automatically unencrypted. If you attempt to copy an unencrypted object to an encryption-required Amazon S3 bucket, the operation fails. For more information, see [Using data encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html).  Example: `‑‑s3ServerSideEncryption`   | No  | 
| ‑‑deleteOnSuccess  |  If the copy operation is successful, this option causes S3DistCp to delete the copied files from the source location. This is useful if you are copying output files, such as log files, from one location to another as a scheduled task, and you don't want to copy the same files twice.  Example: `‑‑deleteOnSuccess`   | No  | 
| ‑‑disableMultipartUpload  |  Disables the use of multipart upload.  Example: `‑‑disableMultipartUpload`   | No  | 
| ‑‑multipartUploadChunkSize=SIZE  |  The size, in MiB, of each part in an Amazon S3 multipart upload. S3DistCp uses multipart upload when it copies data larger than the `multipartUploadChunkSize`. To improve job performance, you can increase the size of each part. The default size is 128 MiB.  Example: `‑‑multipartUploadChunkSize=1000`   | No  | 
| ‑‑numberFiles  |  Prepends output files with sequential numbers. The count starts at 0 unless a different value is specified by `‑‑startingIndex`.  Example: `‑‑numberFiles`   | No  | 
| ‑‑startingIndex=INDEX  |  Used with `‑‑numberFiles` to specify the first number in the sequence.  Example: `‑‑startingIndex=1`   | No  | 
| ‑‑outputManifest=FILENAME  |  Creates a text file, compressed with Gzip, that contains a list of all the files copied by S3DistCp.  Example: `‑‑outputManifest=manifest-1.gz`   | No  | 
| ‑‑previousManifest=PATH  |  Reads a manifest file that was created during a previous call to S3DistCp using the `‑‑outputManifest` flag. When the `‑‑previousManifest` flag is set, S3DistCp excludes the files listed in the manifest from the copy operation. If `‑‑outputManifest` is specified along with `‑‑previousManifest`, files listed in the previous manifest also appear in the new manifest file, although the files are not copied.  Example: `‑‑previousManifest=/usr/bin/manifest-1.gz`   | No  | 
| ‑‑requirePreviousManifest |  Requires a previous manifest created during a previous call to S3DistCp. If this is set to false, no error is generated when a previous manifest is not specified. The default is true.  | No  | 
| ‑‑copyFromManifest  |  Reverses the behavior of `‑‑previousManifest` to cause S3DistCp to use the specified manifest file as a list of files to copy, instead of a list of files to exclude from copying.  Example: `‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`   | No  | 
| ‑‑s3Endpoint=ENDPOINT |  Specifies the Amazon S3 endpoint to use when uploading a file. This option sets the endpoint for both the source and destination. If not set, the default endpoint is `s3.amazonaws.com`. For a list of the Amazon S3 endpoints, see [Regions and endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region).  Example: `‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`   | No  | 
| ‑‑storageClass=CLASS |  The storage class to use when the destination is Amazon S3. Valid values are STANDARD and REDUCED\$1REDUNDANCY. If this option is not specified, S3DistCp tries to preserve the storage class. Example: `‑‑storageClass=STANDARD`  | No  | 
| ‑‑srcPrefixesFile=PATH |  a text file in Amazon S3 (s3://), HDFS (hdfs:///) or local file system (file:/) that contains a list of `src` prefixes, one prefix per line.  If `srcPrefixesFile` is provided, S3DistCp will not list the src path. Instead, it generates a source list as the combined result of listing all prefixes specified in this file. The relative path as compared to src path, instead of these prefixes, will be used to generate the destination paths. If `srcPattern` is also specified, it will be applied to the combined list results of the source prefixes to further filter the input. If `copyFromManifest` is used, objects in the manifest will be copied and `srcPrefixesFile` will be ignored. Example: `‑‑srcPrefixesFile=PATH`  | No  | 

In addition to the options above, S3DistCp implements the [Tool interface](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html) which means that it supports the generic options. 

## Adding S3DistCp as a step in a cluster
<a name="UsingEMR_s3distcp.step"></a>

You can call S3DistCp by adding it as a step in your cluster. Steps can be added to a cluster at launch or to a running cluster using the console, CLI, or API. The following examples demonstrate adding an S3DistCp step to a running cluster. For more information on adding steps to a cluster, see [Submit work to a cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html) in the *Amazon EMR Management Guide*.

**To add a S3DistCp step to a running cluster using the AWS CLI**

For more information on using Amazon EMR commands in the AWS CLI, see the [AWS CLI Command Reference](https://docs.aws.amazon.com/cli/latest/reference/emr).
+ To add a step to a cluster that calls S3DistCp, pass the parameters that specify how S3DistCp should perform the copy operation as arguments. 

  The following example copies daemon logs from Amazon S3 to `hdfs:///output`. In the following command:
  + `‑‑cluster-id` specifies the cluster
  + `Jar` is the location of the S3DistCp JAR file. For an example of how to run a command on a cluster using command-runner.jar, see [Submit a custom JAR step to run a script or command](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples).
  + `Args` is a comma-separated list of the option name-value pairs to pass in to S3DistCp. For a complete list of the available options, see [S3DistCp options](#UsingEMR_s3distcp.options). 

  To add an S3DistCp copy step to a running cluster, put the following in a JSON file saved in Amazon S3 or your local file system as `myStep.json` for this example. Replace *j-3GYXXXXXX9IOK* with your cluster ID and replace *amzn-s3-demo-bucket* with your Amazon S3 bucket name.

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example Copy log files from Amazon S3 to HDFS**  
This example also illustrates how to copy log files stored in an Amazon S3 bucket into HDFS by adding a step to a running cluster. In this example the `‑‑srcPattern` option is used to limit the data copied to the daemon logs.   
To copy log files from Amazon S3 to HDFS using the `‑‑srcPattern` option, put the following in a JSON file saved in Amazon S3 or your local file system as `myStep.json` for this example. Replace *j-3GYXXXXXX9IOK* with your cluster ID and replace *amzn-s3-demo-bucket* with your Amazon S3 bucket name.  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## Cleaning up after failed S3DistCp jobs
<a name="s3distcp-cleanup"></a>

If S3DistCp cannot copy some or all of the specified files, the command or cluster step fails and returns a non-zero error code. If this occurs, S3DistCp does not clean up partially copied files. You must delete them manually.

Partially copied files are saved to the HDFS `tmp` directory in sub-directories with the unique identifier of the S3DistCp job. You can find this ID in the standard output of the job.

For example, for an S3DistCp job with the ID `4b1c37bb-91af-4391-aaf8-46a6067085a6`, you can connect to the master node of the cluster and run the following command to view output files associated with the job.

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

The command returns a list of files similar to the following:

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

You can then run the following command to delete the directory and all contents.

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```