

# Process data with streaming


Hadoop streaming is a utility that comes with Hadoop that enables you to develop MapReduce executables in languages other than Java. Streaming is implemented in the form of a JAR file, so you can run it from the Amazon EMR API or command line just like a standard JAR file. 

This section describes how to use streaming with Amazon EMR. 

**Note**  
Apache Hadoop streaming is an independent tool. As such, all of its functions and parameters are not described here. For more information about Hadoop streaming, go to [http://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html](http://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html).

## Using the Hadoop streaming utility


This section describes how use to Hadoop's streaming utility.


**Hadoop process**  

|  |  | 
| --- |--- |
| 1 |  Write your mapper and reducer executable in the programming language of your choice. Follow the directions in Hadoop's documentation to write your streaming executables. The programs should read their input from standard input and output data through standard output. By default, each line of input/output represents a record and the first tab on each line is used as a separator between the key and value.  | 
| 2 |  Test your executables locally and upload them to Amazon S3.  | 
| 3 |  Use the Amazon EMR command line interface or Amazon EMR console to run your application.  | 

Each mapper script launches as a separate process in the cluster. Each reducer executable turns the output of the mapper executable into the data output by the job flow.

The `input`, `output`, `mapper`, and `reducer` parameters are required by most streaming applications. The following table describes these and other, optional parameters.


| Parameter | Description | Required | 
| --- | --- | --- | 
| -input |  Location on Amazon S3 of the input data. Type: String Default: None Constraint: URI. If no protocol is specified then it uses the cluster's default file system.   | Yes | 
| -output |  Location on Amazon S3 where Amazon EMR uploads the processed data. Type: String Default: None Constraint: URI Default: If a location is not specified, Amazon EMR uploads the data to the location specified by `input`.  | Yes | 
| -mapper |  Name of the mapper executable. Type: String Default: None  | Yes | 
| -reducer |  Name of the reducer executable. Type: String Default: None  | Yes | 
| -cacheFile |  An Amazon S3 location containing files for Hadoop to copy into your local working directory (primarily to improve performance). Type: String Default: None Constraints: [URI]\$1[symlink name to create in working directory]   | No | 
| -cacheArchive |  JAR file to extract into the working directory Type: String Default: None Constraints: [URI]\$1[symlink directory name to create in working directory   | No | 
| -combiner |  Combines results Type: String Default: None Constraints: Java class name  | No | 

The following code sample is a mapper executable written in Python. This script is part of the WordCount sample application.

```
 1. #!/usr/bin/python
 2. import sys
 3. 
 4. def main(argv):
 5.   line = sys.stdin.readline()
 6.   try:
 7.     while line:
 8.       line = line.rstrip()
 9.       words = line.split()
10.       for word in words:
11.         print "LongValueSum:" + word + "\t" + "1"
12.       line = sys.stdin.readline()
13.   except "end of file":
14.     return None
15. if __name__ == "__main__":
16.   main(sys.argv)
```

# Submit a streaming step


This section covers the basics of submitting a streaming step to a cluster. A streaming application reads input from standard input and then runs a script or executable (called a mapper) against each input. The result from each of the inputs is saved locally, typically on a Hadoop Distributed File System (HDFS) partition. After all the input is processed by the mapper, a second script or executable (called a reducer) processes the mapper results. The results from the reducer are sent to standard output. You can chain together a series of streaming steps, where the output of one step becomes the input of another step. 

The mapper and the reducer can each be referenced as a file or you can supply a Java class. You can implement the mapper and reducer in any of the supported languages, including Ruby, Perl, Python, PHP, or Bash.

## Submit a streaming step using the console


This example describes how to use the Amazon EMR console to submit a streaming step to a running cluster.

**To submit a streaming step**

1. Open the Amazon EMR console at [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr/).

1. In the **Cluster List**, select the name of your cluster.

1. Scroll to the **Steps** section and expand it, then choose **Add step**.

1. In the **Add Step** dialog box:
   + For **Step type**, choose **Streaming program**.
   + For **Name**, accept the default name (Streaming program) or type a new name.
   + For **Mapper**, type or browse to the location of your mapper class in Hadoop, or an S3 bucket where the mapper executable, such as a Python program, resides. The path value must be in the form *BucketName*/*path*/*MapperExecutable*.
   + For **Reducer**, type or browse to the location of your reducer class in Hadoop, or an S3 bucket where the reducer executable, such as a Python program, resides. The path value must be in the form *BucketName*/*path*/*MapperExecutable*. Amazon EMR supports the special *aggregate* keyword. For more information, go to the Aggregate library supplied by Hadoop.
   + For **Input S3 location**, type or browse to the location of your input data. 
   + For **Output S3 location**, type or browse to the name of your Amazon S3 output bucket.
   + For **Arguments**, leave the field blank.
   + For **Action on failure**, accept the default option (**Continue**).

1. Choose **Add**. The step appears in the console with a status of Pending. 

1. The status of the step changes from Pending to Running to Completed as the step runs. To update the status, choose the **Refresh** icon above the Actions column. 

## AWS CLI


These examples demonstrate how to use the AWS CLI to create a cluster and submit a Streaming step. 

**To create a cluster and submit a streaming step using the AWS CLI**
+ To create a cluster and submit a streaming step using the AWS CLI, type the following command and replace *myKey* with the name of your EC2 key pair. Note that your argument for `--files` should be the Amazon S3 path to your script's location, and the arguments for `-mapper` and `-reducer` should be the names of the respective script files.

  ```
  aws emr create-cluster --name "Test cluster" --release-label emr-7.12.0 --applications Name=Hue Name=Hive Name=Pig --use-default-roles \
  --ec2-attributes KeyName=myKey --instance-type m5.xlarge --instance-count 3 \
  --steps Type=STREAMING,Name="Streaming Program",ActionOnFailure=CONTINUE,Args=[--files,pathtoscripts,-mapper,mapperscript,-reducer,reducerscript,aggregate,-input,pathtoinputdata,-output,pathtooutputbucket]
  ```
**Note**  
Linux line continuation characters (\$1) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).

  When you specify the instance count without using the `--instance-groups` parameter, a single master node is launched, and the remaining instances are launched as core nodes. All nodes use the instance type specified in the command.
**Note**  
If you have not previously created the default Amazon EMR service role and EC2 instance profile, type aws `emr create-default-roles` to create them before typing the `create-cluster` subcommand.

  For more information on using Amazon EMR commands in the AWS CLI, see [https://docs.aws.amazon.com/cli/latest/reference/emr](https://docs.aws.amazon.com/cli/latest/reference/emr).