

# Getting started tutorials for Amazon Kinesis Data Streams
Getting started tutorials

Amazon Kinesis Data Streams provides a number of different solutions to ingesting and consuming data from Kinesis data streams. The tutorials in this section are designed to further assist you in understanding Amazon Kinesis Data Streams concepts and functionality and identify the solution that meets your needs. 

**Topics**
+ [

# Tutorial: Process real-time stock data using KPL and KCL 2.x
](tutorial-stock-data-kplkcl2.md)
+ [

# Tutorial: Process real-time stock data using KPL and KCL 1.x
](tutorial-stock-data-kplkcl.md)
+ [

# Tutorial: Analyze real-time stock data using Amazon Managed Service for Apache Flink
](tutorial-stock-data.md)
+ [

# Tutorial: Use AWS Lambda with Amazon Kinesis Data Streams
](tutorial-stock-data-lambda.md)
+ [

# Use the AWS Streaming Data Solution for Amazon Kinesis
](examples-streaming-solution.md)

# Tutorial: Process real-time stock data using KPL and KCL 2.x


The scenario for this tutorial involves ingesting stock trades into a data stream and writing a basic Amazon Kinesis Data Streams application that performs calculations on the stream. You will learn how to send a stream of records to Kinesis Data Streams and implement an application that consumes and processes the records in near real time.

**Important**  
After you create a stream, your account incurs nominal charges for Kinesis Data Streams usage because Kinesis Data Streams is not eligible for the AWS Free Tier. After the consumer application starts, it also incurs nominal charges for Amazon DynamoDB usage. The consumer application uses DynamoDB to track processing state. When you are finished with this application, delete your AWS resources to stop incurring charges. For more information, see [Clean up resources](tutorial-stock-data-kplkcl2-finish.md).

The code does not access actual stock market data, but instead simulates the stream of stock trades. It does so by using a random stock trade generator that has a starting point of real market data for the top 25 stocks by market capitalization as of February 2015. If you have access to a real-time stream of stock trades, you might be interested in deriving useful, timely statistics from that stream. For example, you might want to perform a sliding window analysis where you determine the most popular stock purchased in the last 5 minutes. Or you might want a notification whenever there is a sell order that is too large (that is, it has too many shares). You can extend the code in this series to provide such functionality.

You can work through the steps in this tutorial on your desktop or laptop computer and run both the producer and consumer code on the same machine or any platform that supports the defined requirements.

The examples shown use the US West (Oregon) Region, but they work on any of the [AWS Regions](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region) that support Kinesis Data Streams.

**Topics**
+ [

# Complete prerequisites
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Create a data stream
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Create an IAM policy and user
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Download and build the code
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implement the producer
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implement the consumer
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Optional) Extend the consumer
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Clean up resources
](tutorial-stock-data-kplkcl2-finish.md)

# Complete prerequisites


You must meet the following requirements to complete this tutorial:

## Create and use an Amazon Web Services Account


Before you begin, make sure that you are familiar with the concepts discussed in [Amazon Kinesis Data Streams Terminology and concepts](key-concepts.md), particularly with streams, shards, producers, and consumers. It is also helpful to have completed the steps in the following guide: [Tutorial: Install and configure the AWS CLI for Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

You must have an AWS account and a web browser to access the AWS Management Console.

For console access, use your IAM user name and password to sign in to the [AWS Management Console](https://console.aws.amazon.com/console/home) from the IAM sign-in page. For information about AWS security credentials, including programmatic access and alternatives to long-term credentials, see [AWS security credentials](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) in the *IAM User Guide*. For details about signing in to your AWS account, see [How to sign in to AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) in the *AWS Sign-In User Guide*.

For more information about IAM and security key setup instructions, see [Create an IAM User](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Fulfill system software requirements


The system that you are using to run the application must have Java 7 or higher installed. To download and install the latest Java Development Kit (JDK), go to [Oracle's Java SE installation site](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

You need the latest [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) version. 

The consumer application requires the Kinesis Client Library (KCL) version 2.2.9 or higher, which you can obtain from GitHub at [https://github.com/awslabs/amazon-kinesis-client/tree/master](https://github.com/awslabs/amazon-kinesis-client/tree/master).

## Next steps


[Create a data stream](tutorial-stock-data-kplkcl2-create-stream.md)

# Create a data stream


First, you must create the data stream that you will use in subsequent steps of this tutorial.

**To create a stream**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. In the navigation bar, expand the Region selector and choose a Region.

1. Choose **Create Kinesis stream**.

1. Enter a name for your data stream (for example, **StockTradeStream**).

1. Enter **1** for the number of shards, but keep **Estimate the number of shards you'll need** collapsed.

1. Choose **Create Kinesis stream**.

On the **Kinesis streams** list page, the status of your stream appears as `CREATING` while the stream is being created. When the stream is ready to use, the status changes to `ACTIVE`. 

If you choose the name of your stream, in the page that appears, the **Details** tab displays a summary of your data stream configuration. The **Monitoring** section displays monitoring information for the stream.

## Next steps


[Create an IAM policy and user](tutorial-stock-data-kplkcl2-iam.md)

# Create an IAM policy and user


Security best practices for AWS dictate the use of fine-grained permissions to control access to different resources. AWS Identity and Access Management (IAM) lets you to manage users and user permissions in AWS. An [IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) explicitly lists actions that are allowed and the resources on which the actions are applicable.

The following are the minimum permissions generally required for Kinesis Data Streams producers and consumers.


**Producer**  

| Actions | Resource | Purpose | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis data stream | Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis data stream | Subscribes and registers consumers to a shard. | 
| PutRecord, PutRecords | Kinesis data stream | Writes records to Kinesis Data Streams. | 


**Consumer**  

| **Actions** | **Resource** | **Purpose** | 
| --- | --- | --- | 
| DescribeStream | Kinesis data stream | Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream. | 
| GetRecords, GetShardIterator  | Kinesis data stream | Reads records from a shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB table | If the consumer is developed using the Kinesis Client Library (KCL) (either version 1.x or 2.x), it needs permissions to a DynamoDB table to track the processing state of the application. | 
| DeleteItem | Amazon DynamoDB table | For when the consumer performs split/merge operations on Kinesis Data Streams shards. | 
| PutMetricData | Amazon CloudWatch log | The KCL also uploads metrics to CloudWatch, which are useful for monitoring the application. | 

For this tutorial, you will create a single IAM policy that grants all of the preceding permissions. In production, you might want to create two policies, one for producers and one for consumers.

**To create an IAM policy**

1. Locate the Amazon Resource Name (ARN) for the new data stream that you created in the previous step. You can find this ARN listed as **Stream ARN** at the top of the **Details** tab. The ARN format is as follows:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
The AWS Region code; for example, `us-west-2`. For more information, see [Region and Availability Zone Concepts](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
The AWS account ID, as shown in [Account Settings](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
The name of the data stream that you created in the preceding step, which is `StockTradeStream`.

1. Determine the ARN for the DynamoDB table to be used by the consumer (and to be created by the first consumer instance). It must be in the following format:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   The Region and account ID are identical to the values in the ARN of the data stream that you're using for this tutorial, but the *name* is the name of the DynamoDB table created and used by the consumer application. KCL uses the application name as the table name. In this step, use `StockTradesProcessor` for the DynamoDB table name, because that is the application name used in later steps in this tutorial.

1. In the IAM console, in **Policies** ([https://console.aws.amazon.com/iam/home\$1policies](https://console.aws.amazon.com/iam/home#policies)), choose **Create policy**. If this is the first time that you have worked with IAM policies, choose **Get Started**, **Create Policy**.

1. Choose **Select** next to **Policy Generator**.

1. Choose **Amazon Kinesis** as the AWS service.

1. Select `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord`, and `PutRecords` as the allowed actions.

1. Enter the ARN of the data stream that you're using in this tutorial.

1. Use **Add Statement** for each of the following:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   The asterisk (`*`) that is used when specifying an ARN is not required. In this case, it's because there is no specific resource in CloudWatch on which the `PutMetricData` action is invoked.

1. Choose **Next Step**.

1. Change **Policy Name** to `StockTradeStreamPolicy`, review the code, and choose **Create Policy**.

The resulting policy document should look like this:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**To create an IAM user**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the **Users** page, choose **Add user**.

1. For **User name**, type `StockTradeStreamUser`.

1. For **Access type**, choose **Programmatic access**, and then choose **Next: Permissions**.

1. Choose **Attach existing policies directly**.

1. Search by name for the policy that you created in the preceding procedure (`StockTradeStreamPolicy`. Select the box to the left of the policy name, and then choose **Next: Review**.

1. Review the details and summary, and then choose **Create user**.

1. Copy the **Access key ID**, and save it privately. Under **Secret access key**, choose **Show**, and save that key privately also.

1. Paste the access and secret keys to a local file in a safe place that only you can access. For this application, create a file named ` ~/.aws/credentials` (with strict permissions). The file should be in the following format:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**To attach an IAM policy to a user**

1. In the IAM console, open [Policies](https://console.aws.amazon.com/iam/home?#policies) and choose **Policy Actions**. 

1. Choose `StockTradeStreamPolicy` and **Attach**.

1. Choose `StockTradeStreamUser` and **Attach Policy**.

## Next steps


[Download and build the code](tutorial-stock-data-kplkcl2-download.md)

# Download and build the code


This topic provides sample implementation code for the sample stock trades ingestion into the data stream (*producer*) and the processing of this data (*consumer*).

**To download and build the code**

1. Download the source code from the [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repo to your computer.

1. Create a project in your IDE with the source code, adhering to the provided directory structure.

1. Add the following libraries to the project:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Depending on your IDE, the project might be built automatically. If not, build the project using the appropriate steps for your IDE.

If you complete these steps successfully, you are now ready to move to the next section, [Implement the producer](tutorial-stock-data-kplkcl2-producer.md). 

## Next steps


[[Implement the producer](tutorial-stock-data-kplkcl2-producer.md)Implement the producer](tutorial-stock-data-kplkcl2-producer.md)

# Implement the producer


This tutorial uses the real-world scenario of stock market trade monitoring. The following principles briefly explain how this scenario maps to the producer and its supporting code structure.

Refer to the [source code](https://github.com/aws-samples/amazon-kinesis-learning ) and review the following information.

**StockTrade class**  
An individual stock trade is represented by an instance of the StockTrade class. This instance contains attributes such as the ticker symbol, price, number of shares, the type of the trade (buy or sell), and an ID uniquely identifying the trade. This class is implemented for you. 

**Stream record**  
A stream is a sequence of records. A record is a serialization of a `StockTrade` instance in JSON format. For example:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator class**  
StockTradeGenerator has a method called `getRandomTrade()` that returns a new randomly generated stock trade every time it is invoked. This class is implemented for you.

**StockTradesWriter class**  
The `main` method of the producer, StockTradesWriter continuously retrieves a random trade and then sends it to Kinesis Data Streams by performing the following tasks:  

1. Reads the data stream name and Region name as input.

1. Uses the `KinesisAsyncClientBuilder` to set the Region, credentials, and client configuration. 

1. Checks that the stream exists and is active (if not, it exits with an error). 

1. In a continuous loop, calls the `StockTradeGenerator.getRandomTrade()` method and then the `sendStockTrade` method to send the trade to the stream every 100 milliseconds. 
The `sendStockTrade` method of the `StockTradesWriter` class has the following code:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Refer to the following code breakdown:
+ The `PutRecord` API expects a byte array, and you must convert trade to JSON format. This single line of code performs that operation: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Before you can send the trade, you create a new `PutRecordRequest` instance (called request in this case). Each `request` requires the stream name, partition key, and a data blob. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  The example uses a stock ticker as a partition key, which maps the record to a specific shard. In practice, you should have hundreds or thousands of partition keys per shard such that records are evenly dispersed across your stream. For more information about how to add data to a stream, see [Write data to Amazon Kinesis Data Streams](building-producers.md).

  Now `request` is ready to send to the client (the put operation): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Error checking and logging are always useful additions. This code logs error conditions: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Add the try/catch block around the `put` operation: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  This is because a Kinesis Data Streams put operation can fail because of a network error, or due to the data stream reaching its throughput limits and getting throttled. It is recommended that you carefully consider your retry policy for `put` operations to avoid data loss, such as using a retry. 
+ Status logging is helpful but optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
The producer shown here uses the Kinesis Data Streams API single record functionality, `PutRecord`. In practice, if an individual producer generates many records, it is often more efficient to use the multiple records functionality of `PutRecords` and send batches of records at a time. For more information, see [Write data to Amazon Kinesis Data Streams](building-producers.md).

**To run the producer**

1. Verify that the access key and secret key pair retrieved in [Create an IAM policy and user](tutorial-stock-data-kplkcl2-iam.md) are saved in the file `~/.aws/credentials`. 

1. Run the `StockTradeWriter` class with the following arguments:

   ```
   StockTradeStream us-west-2
   ```

   If you created your stream in a region other than `us-west-2`, you have to specify that region here instead.

You should see output similar to the following:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Your stock trades are now being ingested by Kinesis Data Streams.

## Next steps


[Implement the consumer](tutorial-stock-data-kplkcl2-consumer.md)

# Implement the consumer


The consumer application in this tutorial continuously processes the stock trades in your data stream. It then outputs the most popular stocks being bought and sold every minute. The application is built on top of the Kinesis Client Library (KCL), which does much of the heavy lifting common to consumer apps. For more information, see [KCL 1.x and 2.x information](shared-throughput-kcl-consumers.md). 

Refer to the source code and review the following information.

**StockTradesProcessor class**  
The main class of the consumer, provided for you, which performs the following tasks:  
+ Reads the application, data stream, and Region names, passed in as arguments.
+ Creates a `KinesisAsyncClient` instance with the Region name.
+ Creates a `StockTradeRecordProcessorFactory` instance that serves instances of `ShardRecordProcessor`, implemented by a `StockTradeRecordProcessor` instance. 
+ Creates a `ConfigsBuilder` instance with the `KinesisAsyncClient`, `StreamName`, `ApplicationName`, and `StockTradeRecordProcessorFactory` instance. This is useful for creating all configurations with default values.
+ Creates a KCL scheduler (previously, in KCL versions 1.x it was known as the KCL worker) with the `ConfigsBuilder` instance. 
+ The scheduler creates a new thread for each shard (assigned to this consumer instance), which continuously loops to read records from the data stream. It then invokes the `StockTradeRecordProcessor` instance to process each batch of records received. 

**StockTradeRecordProcessor class**  
Implementation of the `StockTradeRecordProcessor` instance, which in turn implements five required methods: `initialize`, `processRecords`, `leaseLost`, `shardEnded`, and `shutdownRequested`.   
The `initialize` and `shutdownRequested` methods are used by the KCL to let the record processor know when it should be ready to start receiving records and when it should expect to stop receiving records, respectively, so it can perform any application-specific setup and termination tasks. `leaseLost` and `shardEnded` are used to implement any logic for what to do when a lease is lost or a processing has reached the end of a shard. In this example, we simply log messages indicating these events.   
The code for these methods is provided for you. The main processing happens in the `processRecords` method, which in turn uses `processRecord` for each record. This latter method is provided as the mostly empty skeleton code for you to implement in the next step, where it is explained in greater detail.   
Also of note is the implementation of the support methods for `processRecord`: `reportStats`, and `resetStats`, which are empty in the original source code.   
The `processRecords` method is implemented for you, and performs the following steps:  
+ For each record passed in, it calls `processRecord` on it. 
+ If at least 1 minute has elapsed since the last report, calls `reportStats()`, which prints out the latest stats, and then `resetStats()` which clears the stats so that the next interval includes only new records.
+ Sets the next reporting time.
+ If at least 1 minute has elapsed since the last checkpoint, calls `checkpoint()`. 
+ Sets the next checkpointing time.
This method uses 60-second intervals for the reporting and checkpointing rate. For more information about checkpointing, see [Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats class**  
This class provides data retention and statistics tracking for the most popular stocks over time. This code is provided for you and contains the following methods:  
+ `addStockTrade(StockTrade)`: injects the given `StockTrade` into the running statistics.
+ `toString()`: returns the statistics in a formatted string.
This class tracks the most popular stock by keeping a running count of the total number of trades for each stock and the maximum count. It updates these counts whenever a stock trade arrives.

Add code to the methods of the `StockTradeRecordProcessor` class, as shown in the following steps. 

**To implement the consumer**

1. Implement the `processRecord` method by instantiating a correctly sized `StockTrade` object and adding the record data to it, logging a warning if there's a problem. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implement a `reportStats` method. Modify the output format to suit your preferences. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implement the `resetStats` method, which creates a new `stockStats` instance. 

   ```
   stockStats = new StockStats();
   ```

1. Implement the following methods required by `ShardRecordProcessor` interface:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**To run the consumer**

1. Run the producer that you wrote in [[Implement the producer](tutorial-stock-data-kplkcl2-producer.md)Implement the producer](tutorial-stock-data-kplkcl2-producer.md) to inject simulated stock trade records into your stream.

1. Verify that the access key and secret key pair retrieved earlier (when creating the IAM user) are saved in the file `~/.aws/credentials`. 

1. Run the `StockTradesProcessor` class with the following arguments:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Note that if you created your stream in a Region other than `us-west-2`, you have to specify that Region here instead.

After a minute, you should see output like the following, refreshed every minute thereafter:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Next steps


[(Optional) Extend the consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Optional) Extend the consumer


This optional section shows how you can extend the consumer code for a slightly more elaborate scenario.

If you want to know about the biggest sell orders each minute, you can modify the `StockStats` class in three places to accommodate this new priority.

**To extend the consumer**

1. Add new instance variables:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Add the following code to `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modify the `toString` method to print the additional information:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

If you run the consumer now (remember to run the producer also), you should see output similar to this:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Next steps


[Clean up resources](tutorial-stock-data-kplkcl2-finish.md)

# Clean up resources


Because you are paying to use the Kinesis data stream, make sure that you delete it and the corresponding Amazon DynamoDB table when you are done with it. Nominal charges occur on an active stream even when you aren't sending and getting records. This is because an active stream is using resources by continuously "listening" for incoming records and requests to get records.

**To delete the stream and table**

1. Shut down any producers and consumers that you might still have running.

1. Open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose the stream that you created for this application (`StockTradeStream`).

1. Choose **Delete Stream**.

1. Open the DynamoDB console at [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. Delete the `StockTradesProcessor` table.

## Summary


Processing a large amount of data in near real time doesn’t require writing complicated code or developing a huge infrastructure. It is as basic as writing logic to process a small amount of data (like writing `processRecord(Record)`) but using Kinesis Data Streams to scale so that it works for a large amount of streaming data. You don’t have to worry about how your processing would scale because Kinesis Data Streams handles it for you. All you have to do is send your streaming records to Kinesis Data Streams and write the logic to process each new record received. 

Here are some potential enhancements for this application.

**Aggregate across all shards**  
Currently, you get stats resulting from aggregation of the data records received by a single worker from a single shard. (A shard cannot be processed by more than one worker in a single application at the same time.) Of course, when you scale and have more than one shard, you might want to aggregate across all shards. You can do this by having a pipeline architecture where the output of each worker is fed into another stream with a single shard, which is processed by a worker that aggregates the outputs of the first stage. Because the data from the first stage is limited (one sample per minute per shard), it would easily be handled by one shard.

**Scale processing**  
When the stream scales up to have many shards (because many producers are sending data), the way to scale the processing is to add more workers. You can run the workers in Amazon EC2 instances and use Auto Scaling groups.

**Use connectors to Amazon S3/DynamoDB/Amazon Redshift/Storm**  
As a stream is continuously processed, its output can be sent to other destinations. AWS provides [connectors](https://github.com/awslabs/amazon-kinesis-connectors) to integrate Kinesis Data Streams with other AWS services and third-party tools.

# Tutorial: Process real-time stock data using KPL and KCL 1.x


The scenario for this tutorial involves ingesting stock trades into a data stream and writing a simple Amazon Kinesis Data Streams application that performs calculations on the stream. You will learn how to send a stream of records to Kinesis Data Streams and implement an application that consumes and processes the records in near real time.

**Important**  
After you create a stream, your account incurs nominal charges for Kinesis Data Streams usage because Kinesis Data Streams is not eligible for the AWS Free Tier. After the consumer application starts, it also incurs nominal charges for Amazon DynamoDB usage. The consumer application uses DynamoDB to track processing state. When you are finished with this application, delete your AWS resources to stop incurring charges. For more information, see [Clean up resources](tutorial-stock-data-kplkcl-finish.md).

The code does not access actual stock market data, but instead simulates the stream of stock trades. It does so by using a random stock trade generator that has a starting point of real market data for the top 25 stocks by market capitalization as of February 2015. If you have access to a real-time stream of stock trades, you might be interested in deriving useful, timely statistics from that stream. For example, you might want to perform a sliding window analysis where you determine the most popular stock purchased in the last 5 minutes. Or you might want a notification whenever there is a sell order that is too large (that is, it has too many shares). You can extend the code in this series to provide such functionality.

You can work through the steps in this tutorial on your desktop or laptop computer and run both the producer and consumer code on the same machine or any platform that supports the defined requirements, such as Amazon Elastic Compute Cloud (Amazon EC2).

The examples shown use the US West (Oregon) Region, but they work on any of the [AWS Regions that support Kinesis Data Streams](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Complete prerequisites
](tutorial-stock-data-kplkcl-begin.md)
+ [

# Create a data stream
](tutorial-stock-data-kplkcl-create-stream.md)
+ [

# Create an IAM policy and user
](tutorial-stock-data-kplkcl-iam.md)
+ [

# Download and build the implementation code
](tutorial-stock-data-kplkcl-download.md)
+ [

# Implement the producer
](tutorial-stock-data-kplkcl-producer.md)
+ [

# Implement the consumer
](tutorial-stock-data-kplkcl-consumer.md)
+ [

# (Optional) Extend the consumer
](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [

# Clean up resources
](tutorial-stock-data-kplkcl-finish.md)

# Complete prerequisites


The following are requirements for completing the [Tutorial: Process real-time stock data using KPL and KCL 1.x[Tutorial: Process real-time stock data using KPL and KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md).

## Create and use an Amazon Web Services Account


Before you begin, ensure that you are familiar with the concepts discussed in [Amazon Kinesis Data Streams Terminology and concepts](key-concepts.md), particularly streams, shards, producers, and consumers. It is also helpful to have completed [Tutorial: Install and configure the AWS CLI for Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

You need an AWS account and a web browser to access the AWS Management Console.

For console access, use your IAM user name and password to sign in to the [AWS Management Console](https://console.aws.amazon.com/console/home) from the IAM sign-in page. For information about AWS security credentials, including programmatic access and alternatives to long-term credentials, see [AWS security credentials](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) in the *IAM User Guide*. For details about signing in to your AWS account, see [How to sign in to AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) in the *AWS Sign-In User Guide*.

For more information about IAM and security key setup instructions, see [Create an IAM User](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Fulfill system software requirements


The system used to run the application must have Java 7 or higher installed. To download and install the latest Java Development Kit (JDK), go to [Oracle's Java SE installation site](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

If you have a Java IDE, such as [Eclipse](https://www.eclipse.org/downloads/), you can open the source code, edit, build, and run it.

You need the latest [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) version. If you are using Eclipse as your IDE, you can install the [AWS Toolkit for Eclipse](https://aws.amazon.com/eclipse/) instead. 

The consumer application requires the Kinesis Client Library (KCL) version 1.2.1 or higher, which you can obtain from GitHub at [Kinesis Client Library (Java)](https://github.com/awslabs/amazon-kinesis-client).

## Next Steps


[Create a data stream](tutorial-stock-data-kplkcl-create-stream.md)

# Create a data stream


In the first step of the [Tutorial: Process real-time stock data using KPL and KCL 1.x[Tutorial: Process real-time stock data using KPL and KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md), you create the stream that you will use in subsequent steps.

**To create a stream**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. In the navigation bar, expand the Region selector and choose a Region.

1. Choose **Create Kinesis stream**.

1. Enter a name for your stream (for example, **StockTradeStream**).

1. Enter **1** for the number of shards, but keep **Estimate the number of shards you'll need** collapsed.

1. Choose **Create Kinesis stream**.

On the **Kinesis streams** list page, the status of your stream is `CREATING` while the stream is being created. When the stream is ready to use, the status changes to `ACTIVE`. Choose the name of your stream. In the page that appears, the **Details** tab displays a summary of your stream configuration. The **Monitoring** section displays monitoring information for the stream.

## Additional information about shards


When you begin to use Kinesis Data Streams outside of this tutorial, you might need to plan the stream creation process more carefully. You should plan for expected maximum demand when you provision shards. Using this scenario as an example, US stock market trading traffic peaks during the day (Eastern time) and demand estimates should be sampled from that time of day. You then have a choice to provision for the maximum expected demand, or scale your stream up and down in response to demand fluctuations. 

A *shard* is a unit of throughput capacity. On the **Create Kinesis stream** page, expand **Estimate the number of shards you'll need**. Enter the average record size, the maximum records written per second, and the number of consuming applications, using the following guidelines:

**Average record size**  
An estimate of the calculated average size of your records. If you don't know this value, use the estimated maximum record size for this value.

**Max records written**  
Consider the number of entities providing data and the approximate number of records per second produced by each. For example, if you are getting stock trade data from 20 trading servers and each generates 250 trades per second, the total number of trades (records) per second is 5000/second. 

**Number of consuming applications**  
The number of applications that independently read from the stream to process the stream in a different way and produce different output. Each application can have multiple instances running on different machines (that is, run in a cluster) so that it can keep up with a high volume stream.

If the estimated number of shards shown exceeds your current shard limit, you might need to submit a request to increase that limit before you can create a stream with that number of shards. To request an increase to your shard limit, use the [Kinesis Data Streams Limits form](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis). For more information about streams and shards, see [Create and manage Kinesis data streams](working-with-streams.md).

## Next steps


[Create an IAM policy and user](tutorial-stock-data-kplkcl-iam.md)

# Create an IAM policy and user


Security best practices for AWS dictate the use of fine-grained permissions to control access to different resources. AWS Identity and Access Management (IAM) allows you to manage users and user permissions in AWS. An [IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) explicitly lists actions that are allowed and the resources on which the actions are applicable.

The following are the minimum permissions generally required for a Kinesis Data Streams producer and consumer.


**Producer**  

| Actions | Resource | Purpose | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis data stream | Before attempting to write records, the producer checks if the stream exists and is active, and if the shards are contained in the stream, and if the stream has a consumer. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis data stream | Subscribes and register a consumers to a Kinesis Data Stream shard. | 
| PutRecord, PutRecords | Kinesis data stream | Write records to Kinesis Data Streams. | 


**Consumer**  

| **Actions** | **Resource** | **Purpose** | 
| --- | --- | --- | 
| DescribeStream | Kinesis data stream | Before attempting to read records, the consumer checks if the stream exists and is active, and if the shards are contained in the stream. | 
| GetRecords, GetShardIterator  | Kinesis data stream | Read records from a Kinesis Data Streams shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB table | If the consumer is developed using the Kinesis Client Library (KCL), it needs permissions to a DynamoDB table to track the processing state of the application. The first consumer started creates the table.  | 
| DeleteItem | Amazon DynamoDB table | For when the consumer performs split/merge operations on Kinesis Data Streams shards. | 
| PutMetricData | Amazon CloudWatch log | The KCL also uploads metrics to CloudWatch, which are useful for monitoring the application. | 

For this application, you create a single IAM policy that grants all of the preceding permissions. In practice, you might want to consider creating two policies, one for producers and one for consumers.

**To create an IAM policy**

1. Locate the Amazon Resource Name (ARN) for the new stream. You can find this ARN listed as **Stream ARN** at the top of the **Details** tab. The ARN format is as follows:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
The Region code; for example, `us-west-2`. For more information, see [Region and Availability Zone Concepts](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
The AWS account ID, as shown in [Account Settings](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
The name of the stream from [Create a data stream](tutorial-stock-data-kplkcl-create-stream.md), which is `StockTradeStream`.

1. Determine the ARN for the DynamoDB table to be used by the consumer (and created by the first consumer instance). It must be in the following format:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   The Region and account are from the same place as the previous step, but this time *name* is the name of the table created and used by the consumer application. The KCL used by the consumer uses the application name as the table name. Use `StockTradesProcessor`, which is the application name used later.

1. In the IAM console, in **Policies** ([https://console.aws.amazon.com/iam/home\$1policies](https://console.aws.amazon.com/iam/home#policies)), choose **Create policy**. If this is the first time that you have worked with IAM policies, choose **Get Started**, **Create Policy**.

1. Choose **Select** next to **Policy Generator**.

1. Choose **Amazon Kinesis** as the AWS service.

1. Select `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord`, and `PutRecords` as the allowed actions.

1. Enter the ARN that you created in Step 1.

1. Use **Add Statement** for each of the following:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   The asterisk (`*`) that is used when specifying an ARN is not required. In this case, it's because there is no specific resource in CloudWatch on which the `PutMetricData` action is invoked.

1. Choose **Next Step**.

1. Change **Policy Name** to `StockTradeStreamPolicy`, review the code, and choose **Create Policy**.

The resulting policy document should look something like the following:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**To create an IAM user**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the **Users** page, choose **Add user**.

1. For **User name**, type `StockTradeStreamUser`.

1. For **Access type**, choose **Programmatic access**, and then choose **Next: Permissions**.

1. Choose **Attach existing policies directly**.

1. Search by name for the policy that you created. Select the box to the left of the policy name, and then choose **Next: Review**.

1. Review the details and summary, and then choose **Create user**.

1. Copy the **Access key ID**, and save it privately. Under **Secret access key**, choose **Show**, and save that key privately also.

1. Paste the access and secret keys to a local file in a safe place that only you can access. For this application, create a file named ` ~/.aws/credentials` (with strict permissions). The file should be in the following format:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**To attach an IAM policy to a user**

1. In the IAM console, open [Policies](https://console.aws.amazon.com/iam/home?#policies) and choose **Policy Actions**. 

1. Choose `StockTradeStreamPolicy` and **Attach**.

1. Choose `StockTradeStreamUser` and **Attach Policy**.

## Next Steps


[Download and build the implementation code](tutorial-stock-data-kplkcl-download.md)

# Download and build the implementation code


Skeleton code is provided for the [Tutorial: Process real-time stock data using KPL and KCL 1.x](tutorial-stock-data-kplkcl.md). It contains a stub implementation for both the stock trade stream ingestion (*producer*) and the processing of the data (*consumer*). The following procedure shows how to complete the implementation. 

**To download and build the implementation code**

1. Download the [source code](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1) to your computer.

1. Create a project in your favorite IDE with the source code, adhering to the provided directory structure.

1. Add the following libraries to the project:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Depending on your IDE, the project might be built automatically. If not, build the project using the appropriate steps for your IDE.

If you complete these steps successfully, you are now ready to move to the next section, [Implement the producer](tutorial-stock-data-kplkcl-producer.md). If your build generates errors at any stage, investigate and fix them before proceeding.

## Next steps


[[Implement the producer](tutorial-stock-data-kplkcl-producer.md)Implement the producer](tutorial-stock-data-kplkcl-producer.md)

# Implement the producer




The application in the [Tutorial: Process real-time stock data using KPL and KCL 1.x[Tutorial: Process real-time stock data using KPL and KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) uses the real-world scenario of stock market trade monitoring. The following principles briefly explain how this scenario maps to the producer and supporting code structure.

Refer to the source code and review the following information.

**StockTrade class**  
An individual stock trade is represented by an instance of the `StockTrade` class. This instance contains attributes such as the ticker symbol, price, number of shares, the type of the trade (buy or sell), and an ID uniquely identifying the trade. This class is implemented for you.

**Stream record**  
A stream is a sequence of records. A record is a serialization of a `StockTrade` instance in JSON format. For example:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator class**  
`StockTradeGenerator` has a method called `getRandomTrade()` that returns a new randomly generated stock trade every time it is invoked. This class is implemented for you.

**StockTradesWriter class**  
The `main` method of the producer, `StockTradesWriter` continuously retrieves a random trade and then sends it to Kinesis Data Streams by performing the following tasks:  

1. Reads the stream name and Region name as input.

1. Creates an `AmazonKinesisClientBuilder`.

1. Uses the client builder to set the Region, credentials, and client configuration.

1. Builds an `AmazonKinesis` client using the client builder.

1. Checks that the stream exists and is active (if not, it exits with an error).

1. In a continuous loop, calls the `StockTradeGenerator.getRandomTrade()` method and then the `sendStockTrade` method to send the trade to the stream every 100 milliseconds.
The `sendStockTrade` method of the `StockTradesWriter` class has the following code:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Refer to the following code breakdown:
+ The `PutRecord` API expects a byte array, and you must convert `trade` to JSON format. This single line of code performs that operation:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Before you can send the trade, you create a new `PutRecordRequest` instance (called `putRecord` in this case):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Each `PutRecord` call requires the stream name, partition key, and data blob. The following code populates these fields in the `putRecord` object using its `setXxxx()` methods:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  The example uses a stock ticket as a partition key, which maps the record to a specific shard. In practice, you should have hundreds or thousands of partition keys per shard such that records are evenly dispersed across your stream. For more information about how to add data to a stream, see [Add data to a stream](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  Now `putRecord` is ready to send to the client (the `put` operation):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ Error checking and logging are always useful additions. This code logs error conditions:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Add the try/catch block around the `put` operation:

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  This is because a Kinesis Data Streams `put` operation can fail because of a network error, or due to the stream reaching its throughput limits and getting throttled. We recommend carefully considering your retry policy for `put` operations to avoid data loss, such as using a retry. 
+ Status logging is helpful but optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
The producer shown here uses the Kinesis Data Streams API single record functionality, `PutRecord`. In practice, if an individual producer generates many records, it is often more efficient to use the multiple records functionality of `PutRecords` and send batches of records at a time. For more information, see [Add data to a stream](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**To run the producer**

1. Verify that the access key and secret key pair retrieved earlier (when creating the IAM user) are saved in the file `~/.aws/credentials`. 

1. Run the `StockTradeWriter` class with the following arguments:

   ```
   StockTradeStream us-west-2
   ```

   If you created your stream in a Region other than `us-west-2`, you have to specify that Region here instead.

You should see output similar to the following:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Your stock trade stream is now being ingested by Kinesis Data Streams.

## Next steps


[Implement the consumer](tutorial-stock-data-kplkcl-consumer.md)

# Implement the consumer


The consumer application in the [Tutorial: Process real-time stock data using KPL and KCL 1.x[Tutorial: Process real-time stock data using KPL and KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) continuously processes the stock trades stream that you created in [[Implement the producer](tutorial-stock-data-kplkcl-producer.md)Implement the producer](tutorial-stock-data-kplkcl-producer.md). It then outputs the most popular stocks being bought and sold every minute. The application is built on top of the Kinesis Client Library (KCL), which does much of the heavy lifting common to consumer apps. For more information, see [Develop KCL 1.x consumers](developing-consumers-with-kcl.md). 

Refer to the source code and review the following information.

**StockTradesProcessor class**  
Main class of the consumer, provided for you, which performs the following tasks:  
+ Reads the application, stream, and Region names, passed in as arguments.
+ Reads credentials from `~/.aws/credentials`.
+ Creates a `RecordProcessorFactory` instance that serves instances of `RecordProcessor`, implemented by a `StockTradeRecordProcessor` instance.
+ Creates a KCL worker with the `RecordProcessorFactory` instance and a standard configuration including the stream name, credentials, and application name. 
+ The worker creates a new thread for each shard (assigned to this consumer instance), which continuously loops to read records from Kinesis Data Streams. It then invokes the `RecordProcessor` instance to process each batch of records received.

**StockTradeRecordProcessor class**  
Implementation of the `RecordProcessor` instance, which in turn implements three required methods: `initialize`, `processRecords`, and `shutdown`.  
As the names suggest, `initialize` and `shutdown` are used by the Kinesis Client Library to let the record processor know when it should be ready to start receiving records and when it should expect to stop receiving records, respectively, so it can do any application-specific setup and termination tasks. The code for these is provided for you. The main processing happens in the `processRecords` method, which in turn uses `processRecord` for each record. This latter method is provided as mostly empty skeleton code for you to implement in the next step, where it is explained further.  
Also of note is the implementation of support methods for `processRecord`: `reportStats`, and `resetStats`, which are empty in the original source code.  
The `processRecords` method is implemented for you, and performs the following steps:  
+  For each record passed in, calls `processRecord` on it.
+ If at least 1 minute has elapsed since the last report, calls `reportStats()`, which prints out the latest stats, and then `resetStats()` which clears the stats so that the next interval includes only new records.
+ Sets the next reporting time.
+ If at least 1 minute has elapsed since the last checkpoint, calls `checkpoint()`. 
+ Sets the next checkpointing time.
This method uses 60-second intervals for the reporting and checkpointing rate. For more information about checkpointing, see [Additional information about the consumer](#tutorial-stock-data-kplkcl-consumer-supplement).

**StockStats class**  
This class provides data retention and statistics tracking for the most popular stocks over time. This code is provided for you and contains the following methods:  
+ `addStockTrade(StockTrade)`: Injects the given `StockTrade` into the running statistics.
+ `toString()`: Returns the statistics in a formatted string.
This class tracks the most popular stock by keeping a running count of the total number of trades for each stock and the maximum count. It updates these counts whenever a stock trade arrives.

Add code to the methods of the `StockTradeRecordProcessor` class, as shown in the following steps.

**To implement the consumer**

1. Implement the `processRecord` method by instantiating a correctly sized `StockTrade` object and adding the record data to it, logging a warning if there's a problem.

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. Implement a simple `reportStats` method. Feel free to modify the output format to your preferences.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. Finally, implement the `resetStats` method, which creates a new `stockStats` instance.

   ```
   stockStats = new StockStats();
   ```

**To run the consumer**

1. Run the producer that you wrote in [[Implement the producer](tutorial-stock-data-kplkcl-producer.md)Implement the producer](tutorial-stock-data-kplkcl-producer.md) to inject simulated stock trade records into your stream.

1. Verify that the access key and secret key pair retrieved earlier (when creating the IAM user) are saved in the file `~/.aws/credentials` . 

1. Run the `StockTradesProcessor` class with the following arguments:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Note that if you created your stream in a Region other than `us-west-2`, you have to specify that Region here instead.

After a minute, you should see output like the following, refreshed every minute thereafter:

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Additional information about the consumer


If you are familiar with the advantages of the Kinesis Client Library, discussed in [Develop KCL 1.x consumers](developing-consumers-with-kcl.md) and elsewhere, you might wonder why you should use it here. Although you use only a single shard stream and a single consumer instance to process it, it is still easier to implement the consumer using the KCL. Compare the code implementation steps in the producer section to the consumer, and you can see the comparative ease of implementing a consumer. This is largely due to the services that the KCL provides.

In this application, you focus on implementing a record processor class that can process individual records. You don’t have to worry about how the records are fetched from Kinesis Data Streams; The KCL fetches the records and invoke the record processor whenever there are new records available. Also, you don’t have to worry about how many shards and consumer instances there are. If the stream is scaled up, you don’t have to rewrite your application to handle more than one shard or one consumer instance.

The term *checkpointing* means to record the point in the stream up to the data records that have been consumed and processed thus far. If the application crashes, the stream is read from that point and not from the beginning of the stream. The subject of checkpointing and the various design patterns and best practices for it are outside the scope of this chapter. However, it is something you may encounter in production environments.

As you learned in [[Implement the producer](tutorial-stock-data-kplkcl-producer.md)Implement the producer](tutorial-stock-data-kplkcl-producer.md), the `put` operations in the Kinesis Data Streams API take a *partition key* as input. Kinesis Data Streams uses a partition key as a mechanism to split records across multiple shards (when there is more than one shard in the stream). The same partition key always routes to the same shard. This allows the consumer that processes a particular shard to be designed with the assumption that records with the same partition key are only sent to that consumer, and no records with the same partition key end up at any other consumer. Therefore, a consumer's worker can aggregate all records with the same partition key without worrying that it might be missing needed data.

In this application, the consumer's processing of records is not intensive, so you can use one shard and do the processing in the same thread as the KCL thread. However, in practice, consider first scaling up the number of shards. In some cases you may want to switch processing to a different thread, or use a thread pool if your record processing is expected to be intensive. In this way, the KCL can fetch new records more quickly while the other threads can process the records in parallel. Multithreaded design is not trivial and should be approached with advanced techniques, so increasing your shard count is usually the most effective way to scale up.

## Next steps


[(Optional) Extend the consumer](tutorial-stock-data-kplkcl-consumer-extension.md)

# (Optional) Extend the consumer


The application in the [Tutorial: Process real-time stock data using KPL and KCL 1.x[Tutorial: Process real-time stock data using KPL and KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) might already be sufficient for your purposes. This optional section shows how you can extend the consumer code for a slightly more elaborate scenario.

If you want to know about the biggest sell orders each minute, you can modify the `StockStats` class in three places to accommodate this new priority.

**To extend the consumer**

1. Add new instance variables:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Add the following code to `addStockTrade`:

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modify the `toString` method to print the additional information:

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

If you run the consumer now (remember to run the producer also), you should see output similar to this:

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Next steps


[Clean up resources](tutorial-stock-data-kplkcl-finish.md)

# Clean up resources


Because you are paying to use the Kinesis data stream, make sure that you delete it and the corresponding Amazon DynamoDB table when you are done with it. Nominal charges occur on an active stream even when you aren't sending and getting records. This is because an active stream is using resources by continuously "listening" for incoming records and requests to get records.

**To delete the stream and table**

1. Shut down any producers and consumers that you may still have running.

1. Open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose the stream that you created for this application (`StockTradeStream`).

1. Choose **Delete Stream**.

1. Open the DynamoDB console at [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. Delete the `StockTradesProcessor` table.

## Summary


Processing a large amount of data in near real time doesn’t require writing complicated code or developing a huge infrastructure. It is as basic as writing logic to process a small amount of data (like writing `processRecord(Record)`) but using Kinesis Data Streams to scale so that it works for a large amount of streaming data. You don’t have to worry about how your processing would scale because Kinesis Data Streams handles it for you. All you have to do is send your streaming records to Kinesis Data Streams and write the logic to process each new record received. 

Here are some potential enhancements for this application.

**Aggregate across all shards**  
Currently, you get stats resulting from aggregation of the data records received by a single worker from a single shard. (A shard cannot be processed by more than one worker in a single application at the same time.) Of course, when you scale and have more than one shard, you might want to aggregate across all shards. You can do this by having a pipeline architecture where the output of each worker is fed into another stream with a single shard, which is processed by a worker that aggregates the outputs of the first stage. Because the data from the first stage is limited (one sample per minute per shard), it would easily be handled by one shard.

**Scale processing**  
When the stream scales up to have many shards (because many producers are sending data), the way to scale the processing is to add more workers. You can run the workers in Amazon EC2 instances and use Auto Scaling groups.

**Use connectors to Amazon S3/DynamoDB/Amazon Redshift/Storm**  
As a stream is continuously processed, its output can be sent to other destinations. AWS provides [connectors](https://github.com/awslabs/amazon-kinesis-connectors) to integrate Kinesis Data Streams with other AWS services and third-party tools.

## Next steps

+ For more information about using Kinesis Data Streams API operations, see [Develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java](developing-producers-with-sdk.md), [Develop shared-throughput consumers with the AWS SDK for Java](developing-consumers-with-sdk.md), and [Create and manage Kinesis data streams](working-with-streams.md).
+ For more information about the Kinesis Client Library, see [Develop KCL 1.x consumers](developing-consumers-with-kcl.md). 
+ For more information about how to optimize your application, see [Optimize Amazon Kinesis Data Streams consumersOptimize Kinesis Data Streams consumers](advanced-consumers.md). 

# Tutorial: Analyze real-time stock data using Amazon Managed Service for Apache Flink


The scenario for this tutorial involves ingesting stock trades into a data stream and writing a simple [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html) application that performs calculations on the stream. You will learn how to send a stream of records to Kinesis Data Streams and implement an application that consumes and processes the records in near real time.

With Amazon Managed Service for Apache Flink, you can use Java or Scala to process and analyze streaming data. The service lets you author and run Java or Scala code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

You can build Flink applications in Managed Service for Apache Flink using open-source libraries based on [Apache Flink](https://flink.apache.org/). Apache Flink is a popular framework and engine for processing data streams. 

**Important**  
After you create two data streams and an application, your account incurs nominal charges for Kinesis Data Streams and Managed Service for Apache Flink usage because they are not eligible for the AWS Free Tier. When you are finished with this application, delete your AWS resources to stop incurring charges. 

The code does not access actual stock market data, but instead simulates the stream of stock trades. It does so by using a random stock trade generator. If you have access to a real-time stream of stock trades, you might be interested in deriving useful, timely statistics from that stream. For example, you might want to perform a sliding window analysis where you determine the most popular stock purchased in the last 5 minutes. Or you might want a notification whenever there is a sell order that is too large (that is, it has too many shares). You can extend the code in this series to provide such functionality.

The examples shown use the US West (Oregon) Region, but they work on any of the [AWS Regions that support Managed Service for Apache Flink](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region).

**Topics**
+ [

## Prerequisites for completing the exercises
](#setting-up-prerequisites)
+ [

# Set up an AWS account and create an administrator user
](setting-up.md)
+ [

# Set up the AWS Command Line Interface (AWS CLI)
](setup-awscli.md)
+ [

# Create and run a Managed Service for Apache Flink application
](get-started-exercise.md)

## Prerequisites for completing the exercises
Prerequisites

To complete the steps in this guide, you must have the following:
+ [Java Development Kit](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) (JDK) version 8. Set the `JAVA_HOME` environment variable to point to your JDK install location.
+ We recommend that you use a development environment (such as [Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) or [IntelliJ Idea](https://www.jetbrains.com/idea/)) to develop and compile your application.
+ [Git Client.](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Install the Git client if you haven't already.
+ [Apache Maven Compiler Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/). Maven must be in your working path. To test your Apache Maven installation, enter the following:

  ```
  $ mvn -version
  ```

To get started, go to [Set up an AWS account and create an administrator user](setting-up.md).

# Set up an AWS account and create an administrator user
Step 1: Set up an account

Before you use Amazon Managed Service for Apache Flink for the first time, complete the following tasks:

1. [Sign up for AWS](#setting-up-signup)

1. [Create an IAM user](#setting-up-iam)

## Sign up for AWS


When you sign up for Amazon Web Services (AWS), your AWS account is automatically signed up for all services in AWS, including Amazon Managed Service for Apache Flink. You are charged only for the services that you use.

With Managed Service for Apache Flink, you pay only for the resources that you use. If you are a new AWS customer, you can get started with Managed Service for Apache Flink for free. For more information, see [AWS Free Tier](https://aws.amazon.com/free/).

If you already have an AWS account, skip to the next task. If you don't have an AWS account, follow these steps to create one.

**To create an AWS account**

1. Open [https://portal.aws.amazon.com/billing/signup](https://portal.aws.amazon.com/billing/signup).

1. Follow the online instructions.

   Part of the sign-up procedure involves receiving a phone call or text message and entering a verification code on the phone keypad.

   When you sign up for an AWS account, an *AWS account root user* is created. The root user has access to all AWS services and resources in the account. As a security best practice, assign administrative access to a user, and use only the root user to perform [tasks that require root user access](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks).

Note your AWS account ID because you'll need it for the next task.

## Create an IAM user


Services in AWS, such as Amazon Managed Service for Apache Flink, require that you provide credentials when you access them. This is so that the service can determine whether you have permissions to access the resources that are owned by that service. The AWS Management Console requires that you enter your password. 

You can create access keys for your AWS account to access the AWS Command Line Interface (AWS CLI) or API. However, we don't recommend that you access AWS using the credentials for your AWS account. Instead, we recommend that you use AWS Identity and Access Management (IAM). Create an IAM user, add the user to an IAM group with administrative permissions, and then grant administrative permissions to the IAM user that you created. You can then access AWS using a special URL and that IAM user's credentials.

If you signed up for AWS, but you haven't created an IAM user for yourself, you can create one using the IAM console.

The getting started exercises in this guide assume that you have a user (`adminuser`) with administrator permissions. Follow the procedure to create `adminuser` in your account.

**To create a group for administrators**

1. Sign in to the AWS Management Console and open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. In the navigation pane, choose **Groups**, and then choose **Create New Group**.

1. For **Group Name**, enter a name for your group, such as **Administrators**, and then choose **Next Step**.

1. In the list of policies, select the check box next to the **AdministratorAccess** policy. You can use the **Filter** menu and the **Search** box to filter the list of policies.

1. Choose **Next Step**, and then choose **Create Group**.

Your new group is listed under **Group Name**.

**To create an IAM user for yourself, add it to the Administrators group, and create a password**

1. In the navigation pane, choose **Users**, and then choose **Add user**.

1. In the **User name** box, enter a user name.

1. Choose both **Programmatic access** and **AWS Management Console access**.

1. Choose **Next: Permissions**.

1. Select the check box next to the **Administrators** group. Then choose **Next: Review**.

1. Choose **Create user**.

**To sign in as the new IAM user**

1. Sign out of the AWS Management Console.

1. Use the following URL format to sign in to the console:

   `https://aws_account_number.signin.aws.amazon.com/console/`

   The *aws\$1account\$1number* is your AWS account ID without any hyphens. For example, if your AWS account ID is 1234-5678-9012, replace *aws\$1account\$1number* with **123456789012**. For information about how to find your account number, see [Your AWS Account ID and Its Alias](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html) in the *IAM User Guide*.

1. Enter the IAM user name and password that you just created. When you're signed in, the navigation bar displays *your\$1user\$1name* @ *your\$1aws\$1account\$1id*.

**Note**  
If you don't want the URL for your sign-in page to contain your AWS account ID, you can create an account alias.

**To create or remove an account alias**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the navigation pane, choose **Dashboard**.

1. Find the IAM users sign-in link.

1. To create the alias, choose **Customize**. Enter the name you want to use for your alias, and then choose **Yes, Create**.

1. To remove the alias, choose **Customize**, and then choose **Yes, Delete**. The sign-in URL reverts to using your AWS account ID.

To sign in after you create an account alias, use the following URL:

`https://your_account_alias.signin.aws.amazon.com/console/`

To verify the sign-in link for IAM users for your account, open the IAM console and check under **IAM users sign-in link** on the dashboard.

For more information about IAM, see the following:
+ [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/)
+ [Getting started with IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [IAM User Guide](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## Next step


[Set up the AWS Command Line Interface (AWS CLI)](setup-awscli.md)

# Set up the AWS Command Line Interface (AWS CLI)
Step 2: Set up the AWS CLI

In this step, you download and configure the AWS CLI to use with Amazon Managed Service for Apache Flink.

**Note**  
The getting started exercises in this guide assume that you are using administrator credentials (`adminuser`) in your account to perform the operations.

**Note**  
If you already have the AWS CLI installed, you might need to upgrade to get the latest functionality. For more information, see [ Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) in the *AWS Command Line Interface User Guide*. To check the version of the AWS CLI, run the following command:  

```
aws --version
```
The exercises in this tutorial require the following AWS CLI version or later:  

```
aws-cli/1.16.63
```

**To set up the AWS CLI**

1. Download and configure the AWS CLI. For instructions, see the following topics in the *AWS Command Line Interface User Guide*: 
   + [Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. Add a named profile for the administrator user in the AWS CLI config file. You use this profile when executing the AWS CLI commands. For more information about named profiles, see [Named Profiles](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html) in the *AWS Command Line Interface User Guide*.

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   For a list of available AWS Regions, see [AWS Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html) in the *Amazon Web Services General Reference*.

1. Verify the setup by entering the following help command at the command prompt: 

   ```
   aws help
   ```

After you set up an AWS account and the AWS CLI, you can try the next exercise, in which you configure a sample application and test the end-to-end setup.

## Next step


[Create and run a Managed Service for Apache Flink application](get-started-exercise.md)

# Create and run a Managed Service for Apache Flink application
Step 3: Create an application

In this exercise, you create a Managed Service for Apache Flink application with data streams as a source and a sink.

**Topics**
+ [

## Create two Amazon Kinesis data streams
](#get-started-exercise-1)
+ [

## Write Sample Records to the Input Stream
](#get-started-exercise-2)
+ [

## Download and examine the Apache Flink streaming Java code
](#get-started-exercise-5)
+ [

## Compile the application code
](#get-started-exercise-5.5)
+ [

## Upload the Apache Flink streaming Java code
](#get-started-exercise-6)
+ [

## Create and run the Managed Service for Apache Flink application
](#get-started-exercise-7)

## Create two Amazon Kinesis data streams


Before you create a Amazon Managed Service for Apache Flink for this exercise, create two Kinesis data streams (`ExampleInputStream` and `ExampleOutputStream`). Your application uses these streams for the application source and destination streams.

You can create these streams using either the Amazon Kinesis console or the following AWS CLI command. For console instructions, see [Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**To create the data streams (AWS CLI)**

1. To create the first stream (`ExampleInputStream`), use the following Amazon Kinesis `create-stream` AWS CLI command.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. To create the second stream that the application uses to write output, run the same command, changing the stream name to `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Write Sample Records to the Input Stream


In this section, you use a Python script to write sample records to the stream for the application to process.

**Note**  
This section requires the [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Create a file named `stock.py` with the following contents:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Later in the tutorial, you run the `stock.py` script to send data to the application. 

   ```
   $ python stock.py
   ```

## Download and examine the Apache Flink streaming Java code


The Java application code for this examples is available from GitHub. To download the application code, do the following:

1. Clone the remote repository with the following command:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Navigate to the `GettingStarted` directory.

The application code is located in the `CustomSinkStreamingJob.java` and `CloudWatchLogSink.java` files. Note the following about the application code:
+ The application uses a Kinesis source to read from the source stream. The following snippet creates the Kinesis sink:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Compile the application code


In this section, you use the Apache Maven compiler to create the Java code for the application. For information about installing Apache Maven and the Java Development Kit (JDK), see [Prerequisites for completing the exercises](tutorial-stock-data.md#setting-up-prerequisites).

Your Java application requires the following components:
+ A [Project Object Model (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html) file. This file contains information about the application's configuration and dependencies, including the Amazon Managed Service for Apache Flink libraries.
+ A `main` method that contains the application's logic.

**Note**  
**To use the Kinesis connector for the following application, you must download the source code for the connector and build it as described in the [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html).**

**To create and compile the application code**

1. Create a Java/Maven application in your development environment. For information about creating an application, see the documentation for your development environment:
   + [ Creating your first Java project (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ Creating, Running and Packaging Your First Java Application (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Use the following code for a file named `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Note the following about the preceding code example:
   + This file contains the `main` method that defines the application's functionality.
   + Your application creates source and sink connectors to access external resources using a `StreamExecutionEnvironment` object. 
   + The application creates source and sink connectors using static properties. To use dynamic application properties, use the `createSourceFromApplicationProperties` and `createSinkFromApplicationProperties` methods to create the connectors. These methods read the application's properties to configure the connectors.

1. To use your application code, you compile and package it into a JAR file. You can compile and package your code in one of two ways:
   + Use the command line Maven tool. Create your JAR file by running the following command in the directory that contains the `pom.xml` file:

     ```
     mvn package
     ```
   + Use your development environment. See your development environment documentation for details.

   You can either upload your package as a JAR file, or you can compress your package and upload it as a ZIP file. If you create your application using the AWS CLI, you specify your code content type (JAR or ZIP).

1. If there are errors while compiling, verify that your `JAVA_HOME` environment variable is correctly set.

If the application compiles successfully, the following file is created:

`target/java-getting-started-1.0.jar`

## Upload the Apache Flink streaming Java code


In this section, you create an Amazon Simple Storage Service (Amazon S3) bucket and upload your application code.

**To upload the application code**

1. Open the Amazon S3 console at [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Choose **Create bucket**.

1. Enter **ka-app-code-*<username>*** in the **Bucket name** field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose **Next**.

1. In the **Configure options** step, keep the settings as they are, and choose **Next**.

1. In the **Set permissions** step, keep the settings as they are, and choose **Next**.

1. Choose **Create bucket**.

1. In the Amazon S3 console, choose the **ka-app-code-*<username>*** bucket, and choose **Upload**.

1. In the **Select files** step, choose **Add files**. Navigate to the `java-getting-started-1.0.jar` file that you created in the previous step. Choose **Next**.

1. In the **Set permissions** step, keep the settings as they are. Choose **Next**.

1. In the **Set properties** step, keep the settings as they are. Choose **Upload**.

Your application code is now stored in an Amazon S3 bucket where your application can access it.

## Create and run the Managed Service for Apache Flink application


You can create and run a Managed Service for Apache Flink application using either the console or the AWS CLI.

**Note**  
When you create the application using the console, your AWS Identity and Access Management (IAM) and Amazon CloudWatch Logs resources are created for you. When you create the application using the AWS CLI, you create these resources separately.

**Topics**
+ [

### Create and run the application (Console)
](#get-started-exercise-7-console)
+ [

### Create and run the application (AWS CLI)
](#get-started-exercise-7-cli)

### Create and run the application (Console)


Follow these steps to create, configure, update, and run the application using the console.

#### Create the application


1. Open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. On the Amazon Kinesis dashboard, choose **Create analytics application**.

1. On the **Kinesis Analytics - Create application** page, provide the application details as follows:
   + For **Application name**, enter **MyApplication**.
   + For **Description**, enter **My java test app**.
   + For **Runtime**, choose **Apache Flink 1.6**.

1. For **Access permissions**, choose **Create / update IAM role `kinesis-analytics-MyApplication-us-west-2`**.

1. Choose **Create application**.

**Note**  
When you create an Amazon Managed Service for Apache Flink application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:  
Policy: `kinesis-analytics-service-MyApplication-us-west-2`
Role: `kinesis-analytics-MyApplication-us-west-2`

#### Edit the IAM policy


Edit the IAM policy to add permissions to access the Kinesis data streams.

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Choose **Policies**. Choose the **`kinesis-analytics-service-MyApplication-us-west-2`** policy that the console created for you in the previous section. 

1. On the **Summary** page, choose **Edit policy**. Choose the **JSON** tab.

1. Add the highlighted section of the following policy example to the policy. Replace the sample account IDs (*012345678901*) with your account ID.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Configure the application


1. On the **MyApplication** page, choose **Configure**.

1. On the **Configure application** page, provide the **Code location**:
   + For **Amazon S3 bucket**, enter **ka-app-code-*<username>***.
   + For **Path to Amazon S3 object**, enter **java-getting-started-1.0.jar**.

1. Under **Access to application resources**, for **Access permissions**, choose **Create / update IAM role `kinesis-analytics-MyApplication-us-west-2`**.

1. Under **Properties**, for **Group ID**, enter **ProducerConfigProperties**.

1. Enter the following application properties and values:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/get-started-exercise.html)

1. Under **Monitoring**, ensure that the **Monitoring metrics level** is set to **Application**.

1. For **CloudWatch logging**, select the **Enable** check box.

1. Choose **Update**.

**Note**  
When you choose to enable CloudWatch logging, Managed Service for Apache Flink creates a log group and log stream for you. The names of these resources are as follows:   
Log group: `/aws/kinesis-analytics/MyApplication`
Log stream: `kinesis-analytics-log-stream`

#### Run the application


1. On the **MyApplication** page, choose **Run**. Confirm the action.

1. When the application is running, refresh the page. The console shows the **Application graph**.

#### Stop the application


On the **MyApplication** page, choose **Stop**. Confirm the action.

#### Update the application


Using the console, you can update application settings such as application properties, monitoring settings, and the location or file name of the application JAR. You can also reload the application JAR from the Amazon S3 bucket if you need to update the application code.

On the **MyApplication** page, choose **Configure**. Update the application settings and choose **Update**.

### Create and run the application (AWS CLI)


In this section, you use the AWS CLI to create and run the Managed Service for Apache Flink application. Managed Service for Apache Flink uses the `kinesisanalyticsv2` AWS CLI command to create and interact with Managed Service for Apache Flink applications.

#### Create a Permissions Policy


First, you create a permissions policy with two statements: one that grants permissions for the `read` action on the source stream, and another that grants permissions for `write` actions on the sink stream. You then attach the policy to an IAM role (which you create in the next section). Thus, when Managed Service for Apache Flink assumes the role, the service has the necessary permissions to read from the source stream and write to the sink stream.

Use the following code to create the `KAReadSourceStreamWriteSinkStream` permissions policy. Replace `username` with the user name that you used to create the Amazon S3 bucket to store the application code. Replace the account ID in the Amazon Resource Names (ARNs) (`012345678901`) with your account ID.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

For step-by-step instructions to create a permissions policy, see [Tutorial: Create and Attach Your First Customer Managed Policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) in the *IAM User Guide*.

**Note**  
To access other AWS services, you can use the AWS SDK for Java. Managed Service for Apache Flink automatically sets the credentials required by the SDK to those of the service execution IAM role that is associated with your application. No additional steps are needed.

#### Create an IAM Role


In this section, you create an IAM role that Managed Service for Apache Flink can assume to read a source stream and write to the sink stream.

Managed Service for Apache Flink cannot access your stream without permissions. You grant these permissions via an IAM role. Each IAM role has two policies attached. The trust policy grants Managed Service for Apache Flink permission to assume the role, and the permissions policy determines what Managed Service for Apache Flink can do after assuming the role.

You attach the permissions policy that you created in the preceding section to this role.

**To create an IAM role**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. In the navigation pane, choose **Roles**, **Create Role**.

1. Under **Select type of trusted identity**, choose **AWS Service**. Under **Choose the service that will use this role**, choose **Kinesis**. Under **Select your use case**, choose **Kinesis Analytics**.

   Choose **Next: Permissions**.

1. On the **Attach permissions policies** page, choose **Next: Review**. You attach permissions policies after you create the role.

1. On the **Create role** page, enter **KA-stream-rw-role** for the **Role name**. Choose **Create role**.

   Now you have created a new IAM role called `KA-stream-rw-role`. Next, you update the trust and permissions policies for the role.

1. Attach the permissions policy to the role.
**Note**  
For this exercise, Managed Service for Apache Flink assumes this role for both reading data from a Kinesis data stream (source) and writing output to another Kinesis data stream. So you attach the policy that you created in the previous step, [Create a Permissions Policy](#get-started-exercise-7-cli-policy).

   1. On the **Summary** page, choose the **Permissions** tab.

   1. Choose **Attach Policies**.

   1. In the search box, enter **KAReadSourceStreamWriteSinkStream** (the policy that you created in the previous section).

   1. Choose the **KAReadInputStreamWriteOutputStream** policy, and choose **Attach policy**.

You now have created the service execution role that your application uses to access resources. Make a note of the ARN of the new role.

For step-by-step instructions for creating a role, see [Creating an IAM Role (Console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) in the *IAM User Guide*.

#### Create the Managed Service for Apache Flink Application


1. Save the following JSON code to a file named `create_request.json`. Replace the sample role ARN with the ARN for the role that you created previously. Replace the bucket ARN suffix (`username`) with the suffix that you chose in the previous section. Replace the sample account ID (`012345678901`) in the service execution role with your account ID.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Execute the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) action with the preceding request to create the application: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

The application is now created. You start the application in the next step.

#### Start the Application


In this section, you use the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) action to start the application.

**To start the application**

1. Save the following JSON code to a file named `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Execute the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) action with the preceding request to start the application:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

The application is now running. You can check the Managed Service for Apache Flink metrics on the Amazon CloudWatch console to verify that the application is working.

#### Stop the Application


In this section, you use the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) action to stop the application.

**To stop the application**

1. Save the following JSON code to a file named `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Execute the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) action with the following request to stop the application:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

The application is now stopped.

# Tutorial: Use AWS Lambda with Amazon Kinesis Data Streams


In this tutorial, you create a Lambda function to consume events from a Kinesis data stream. In this example scenario, a custom application writes records to a Kinesis data stream. AWS Lambda then polls this data stream and, when it detects new data records, invokes your Lambda function. AWS Lambda then executes the Lambda function by assuming the execution role that you specified when you created the Lambda function.

For the detailed step by step instructions, see [Tutorial: Using AWS Lambda with Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html). 

**Note**  
This tutorial assumes that you have some knowledge of basic Lambda operations and the AWS Lambda console. If you haven't already, follow the instructions in [Getting Started with AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) to create your first Lambda function.

# Use the AWS Streaming Data Solution for Amazon Kinesis
Use the AWS Streaming Data Solution for Amazon Kinesis

The AWS Streaming Data Solution for Amazon Kinesis automatically configures the AWS services necessary to easily capture, store, process, and deliver streaming data. The solution provides multiple options for solving streaming data use cases that use multiple AWS services including Kinesis Data Streams, AWS Lambda, Amazon API Gateway, and Amazon Managed Service for Apache Flink. 

Each solution includes the following components:
+ A CloudFormation package to deploy the complete example.
+ A CloudWatch dashboard for displaying application metrics.
+ CloudWatch alarms on the most relevant application metrics.
+ All necessary IAM roles and policies.

The solution can be found here: [ Streaming Data Solution for Amazon Kinesis](https://aws.amazon.com/solutions/implementations/aws-streaming-data-solution-for-amazon-kinesis/)