

# Tutorial: Process real-time stock data using KPL and KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

The scenario for this tutorial involves ingesting stock trades into a data stream and writing a basic Amazon Kinesis Data Streams application that performs calculations on the stream. You will learn how to send a stream of records to Kinesis Data Streams and implement an application that consumes and processes the records in near real time.

**Important**  
After you create a stream, your account incurs nominal charges for Kinesis Data Streams usage because Kinesis Data Streams is not eligible for the AWS Free Tier. After the consumer application starts, it also incurs nominal charges for Amazon DynamoDB usage. The consumer application uses DynamoDB to track processing state. When you are finished with this application, delete your AWS resources to stop incurring charges. For more information, see [Clean up resources](tutorial-stock-data-kplkcl2-finish.md).

The code does not access actual stock market data, but instead simulates the stream of stock trades. It does so by using a random stock trade generator that has a starting point of real market data for the top 25 stocks by market capitalization as of February 2015. If you have access to a real-time stream of stock trades, you might be interested in deriving useful, timely statistics from that stream. For example, you might want to perform a sliding window analysis where you determine the most popular stock purchased in the last 5 minutes. Or you might want a notification whenever there is a sell order that is too large (that is, it has too many shares). You can extend the code in this series to provide such functionality.

You can work through the steps in this tutorial on your desktop or laptop computer and run both the producer and consumer code on the same machine or any platform that supports the defined requirements.

The examples shown use the US West (Oregon) Region, but they work on any of the [AWS Regions](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region) that support Kinesis Data Streams.

**Topics**
+ [Complete prerequisites](tutorial-stock-data-kplkcl2-begin.md)
+ [Create a data stream](tutorial-stock-data-kplkcl2-create-stream.md)
+ [Create an IAM policy and user](tutorial-stock-data-kplkcl2-iam.md)
+ [Download and build the code](tutorial-stock-data-kplkcl2-download.md)
+ [Implement the producer](tutorial-stock-data-kplkcl2-producer.md)
+ [Implement the consumer](tutorial-stock-data-kplkcl2-consumer.md)
+ [(Optional) Extend the consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [Clean up resources](tutorial-stock-data-kplkcl2-finish.md)

# Complete prerequisites
<a name="tutorial-stock-data-kplkcl2-begin"></a>

You must meet the following requirements to complete this tutorial:

## Create and use an Amazon Web Services Account
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Before you begin, make sure that you are familiar with the concepts discussed in [Amazon Kinesis Data Streams Terminology and concepts](key-concepts.md), particularly with streams, shards, producers, and consumers. It is also helpful to have completed the steps in the following guide: [Tutorial: Install and configure the AWS CLI for Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

You must have an AWS account and a web browser to access the AWS Management Console.

For console access, use your IAM user name and password to sign in to the [AWS Management Console](https://console.aws.amazon.com/console/home) from the IAM sign-in page. For information about AWS security credentials, including programmatic access and alternatives to long-term credentials, see [AWS security credentials](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) in the *IAM User Guide*. For details about signing in to your AWS account, see [How to sign in to AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) in the *AWS Sign-In User Guide*.

For more information about IAM and security key setup instructions, see [Create an IAM User](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Fulfill system software requirements
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

The system that you are using to run the application must have Java 7 or higher installed. To download and install the latest Java Development Kit (JDK), go to [Oracle's Java SE installation site](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

You need the latest [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) version. 

The consumer application requires the Kinesis Client Library (KCL) version 2.2.9 or higher, which you can obtain from GitHub at [https://github.com/awslabs/amazon-kinesis-client/tree/master](https://github.com/awslabs/amazon-kinesis-client/tree/master).

## Next steps
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Create a data stream](tutorial-stock-data-kplkcl2-create-stream.md)

# Create a data stream
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

First, you must create the data stream that you will use in subsequent steps of this tutorial.

**To create a stream**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. In the navigation bar, expand the Region selector and choose a Region.

1. Choose **Create Kinesis stream**.

1. Enter a name for your data stream (for example, **StockTradeStream**).

1. Enter **1** for the number of shards, but keep **Estimate the number of shards you'll need** collapsed.

1. Choose **Create Kinesis stream**.

On the **Kinesis streams** list page, the status of your stream appears as `CREATING` while the stream is being created. When the stream is ready to use, the status changes to `ACTIVE`. 

If you choose the name of your stream, in the page that appears, the **Details** tab displays a summary of your data stream configuration. The **Monitoring** section displays monitoring information for the stream.

## Next steps
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Create an IAM policy and user](tutorial-stock-data-kplkcl2-iam.md)

# Create an IAM policy and user
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Security best practices for AWS dictate the use of fine-grained permissions to control access to different resources. AWS Identity and Access Management (IAM) lets you to manage users and user permissions in AWS. An [IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) explicitly lists actions that are allowed and the resources on which the actions are applicable.

The following are the minimum permissions generally required for Kinesis Data Streams producers and consumers.


**Producer**  

| Actions | Resource | Purpose | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis data stream | Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis data stream | Subscribes and registers consumers to a shard. | 
| PutRecord, PutRecords | Kinesis data stream | Writes records to Kinesis Data Streams. | 


**Consumer**  

| **Actions** | **Resource** | **Purpose** | 
| --- | --- | --- | 
| DescribeStream | Kinesis data stream | Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream. | 
| GetRecords, GetShardIterator  | Kinesis data stream | Reads records from a shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB table | If the consumer is developed using the Kinesis Client Library (KCL) (either version 1.x or 2.x), it needs permissions to a DynamoDB table to track the processing state of the application. | 
| DeleteItem | Amazon DynamoDB table | For when the consumer performs split/merge operations on Kinesis Data Streams shards. | 
| PutMetricData | Amazon CloudWatch log | The KCL also uploads metrics to CloudWatch, which are useful for monitoring the application. | 

For this tutorial, you will create a single IAM policy that grants all of the preceding permissions. In production, you might want to create two policies, one for producers and one for consumers.

**To create an IAM policy**

1. Locate the Amazon Resource Name (ARN) for the new data stream that you created in the previous step. You can find this ARN listed as **Stream ARN** at the top of the **Details** tab. The ARN format is as follows:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
The AWS Region code; for example, `us-west-2`. For more information, see [Region and Availability Zone Concepts](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
The AWS account ID, as shown in [Account Settings](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
The name of the data stream that you created in the preceding step, which is `StockTradeStream`.

1. Determine the ARN for the DynamoDB table to be used by the consumer (and to be created by the first consumer instance). It must be in the following format:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   The Region and account ID are identical to the values in the ARN of the data stream that you're using for this tutorial, but the *name* is the name of the DynamoDB table created and used by the consumer application. KCL uses the application name as the table name. In this step, use `StockTradesProcessor` for the DynamoDB table name, because that is the application name used in later steps in this tutorial.

1. In the IAM console, in **Policies** ([https://console.aws.amazon.com/iam/home\$1policies](https://console.aws.amazon.com/iam/home#policies)), choose **Create policy**. If this is the first time that you have worked with IAM policies, choose **Get Started**, **Create Policy**.

1. Choose **Select** next to **Policy Generator**.

1. Choose **Amazon Kinesis** as the AWS service.

1. Select `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord`, and `PutRecords` as the allowed actions.

1. Enter the ARN of the data stream that you're using in this tutorial.

1. Use **Add Statement** for each of the following:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   The asterisk (`*`) that is used when specifying an ARN is not required. In this case, it's because there is no specific resource in CloudWatch on which the `PutMetricData` action is invoked.

1. Choose **Next Step**.

1. Change **Policy Name** to `StockTradeStreamPolicy`, review the code, and choose **Create Policy**.

The resulting policy document should look like this:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**To create an IAM user**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the **Users** page, choose **Add user**.

1. For **User name**, type `StockTradeStreamUser`.

1. For **Access type**, choose **Programmatic access**, and then choose **Next: Permissions**.

1. Choose **Attach existing policies directly**.

1. Search by name for the policy that you created in the preceding procedure (`StockTradeStreamPolicy`. Select the box to the left of the policy name, and then choose **Next: Review**.

1. Review the details and summary, and then choose **Create user**.

1. Copy the **Access key ID**, and save it privately. Under **Secret access key**, choose **Show**, and save that key privately also.

1. Paste the access and secret keys to a local file in a safe place that only you can access. For this application, create a file named ` ~/.aws/credentials` (with strict permissions). The file should be in the following format:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**To attach an IAM policy to a user**

1. In the IAM console, open [Policies](https://console.aws.amazon.com/iam/home?#policies) and choose **Policy Actions**. 

1. Choose `StockTradeStreamPolicy` and **Attach**.

1. Choose `StockTradeStreamUser` and **Attach Policy**.

## Next steps
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Download and build the code](tutorial-stock-data-kplkcl2-download.md)

# Download and build the code
<a name="tutorial-stock-data-kplkcl2-download"></a>

This topic provides sample implementation code for the sample stock trades ingestion into the data stream (*producer*) and the processing of this data (*consumer*).

**To download and build the code**

1. Download the source code from the [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repo to your computer.

1. Create a project in your IDE with the source code, adhering to the provided directory structure.

1. Add the following libraries to the project:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Depending on your IDE, the project might be built automatically. If not, build the project using the appropriate steps for your IDE.

If you complete these steps successfully, you are now ready to move to the next section, [Implement the producer](tutorial-stock-data-kplkcl2-producer.md). 

## Next steps
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implement the producer](tutorial-stock-data-kplkcl2-producer.md)Implement the producer](tutorial-stock-data-kplkcl2-producer.md)

# Implement the producer
<a name="tutorial-stock-data-kplkcl2-producer"></a>

This tutorial uses the real-world scenario of stock market trade monitoring. The following principles briefly explain how this scenario maps to the producer and its supporting code structure.

Refer to the [source code](https://github.com/aws-samples/amazon-kinesis-learning ) and review the following information.

**StockTrade class**  
An individual stock trade is represented by an instance of the StockTrade class. This instance contains attributes such as the ticker symbol, price, number of shares, the type of the trade (buy or sell), and an ID uniquely identifying the trade. This class is implemented for you. 

**Stream record**  
A stream is a sequence of records. A record is a serialization of a `StockTrade` instance in JSON format. For example:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator class**  
StockTradeGenerator has a method called `getRandomTrade()` that returns a new randomly generated stock trade every time it is invoked. This class is implemented for you.

**StockTradesWriter class**  
The `main` method of the producer, StockTradesWriter continuously retrieves a random trade and then sends it to Kinesis Data Streams by performing the following tasks:  

1. Reads the data stream name and Region name as input.

1. Uses the `KinesisAsyncClientBuilder` to set the Region, credentials, and client configuration. 

1. Checks that the stream exists and is active (if not, it exits with an error). 

1. In a continuous loop, calls the `StockTradeGenerator.getRandomTrade()` method and then the `sendStockTrade` method to send the trade to the stream every 100 milliseconds. 
The `sendStockTrade` method of the `StockTradesWriter` class has the following code:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Refer to the following code breakdown:
+ The `PutRecord` API expects a byte array, and you must convert trade to JSON format. This single line of code performs that operation: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Before you can send the trade, you create a new `PutRecordRequest` instance (called request in this case). Each `request` requires the stream name, partition key, and a data blob. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  The example uses a stock ticker as a partition key, which maps the record to a specific shard. In practice, you should have hundreds or thousands of partition keys per shard such that records are evenly dispersed across your stream. For more information about how to add data to a stream, see [Write data to Amazon Kinesis Data Streams](building-producers.md).

  Now `request` is ready to send to the client (the put operation): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Error checking and logging are always useful additions. This code logs error conditions: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Add the try/catch block around the `put` operation: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  This is because a Kinesis Data Streams put operation can fail because of a network error, or due to the data stream reaching its throughput limits and getting throttled. It is recommended that you carefully consider your retry policy for `put` operations to avoid data loss, such as using a retry. 
+ Status logging is helpful but optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
The producer shown here uses the Kinesis Data Streams API single record functionality, `PutRecord`. In practice, if an individual producer generates many records, it is often more efficient to use the multiple records functionality of `PutRecords` and send batches of records at a time. For more information, see [Write data to Amazon Kinesis Data Streams](building-producers.md).

**To run the producer**

1. Verify that the access key and secret key pair retrieved in [Create an IAM policy and user](tutorial-stock-data-kplkcl2-iam.md) are saved in the file `~/.aws/credentials`. 

1. Run the `StockTradeWriter` class with the following arguments:

   ```
   StockTradeStream us-west-2
   ```

   If you created your stream in a region other than `us-west-2`, you have to specify that region here instead.

You should see output similar to the following:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Your stock trades are now being ingested by Kinesis Data Streams.

## Next steps
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implement the consumer](tutorial-stock-data-kplkcl2-consumer.md)

# Implement the consumer
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

The consumer application in this tutorial continuously processes the stock trades in your data stream. It then outputs the most popular stocks being bought and sold every minute. The application is built on top of the Kinesis Client Library (KCL), which does much of the heavy lifting common to consumer apps. For more information, see [KCL 1.x and 2.x information](shared-throughput-kcl-consumers.md). 

Refer to the source code and review the following information.

**StockTradesProcessor class**  
The main class of the consumer, provided for you, which performs the following tasks:  
+ Reads the application, data stream, and Region names, passed in as arguments.
+ Creates a `KinesisAsyncClient` instance with the Region name.
+ Creates a `StockTradeRecordProcessorFactory` instance that serves instances of `ShardRecordProcessor`, implemented by a `StockTradeRecordProcessor` instance. 
+ Creates a `ConfigsBuilder` instance with the `KinesisAsyncClient`, `StreamName`, `ApplicationName`, and `StockTradeRecordProcessorFactory` instance. This is useful for creating all configurations with default values.
+ Creates a KCL scheduler (previously, in KCL versions 1.x it was known as the KCL worker) with the `ConfigsBuilder` instance. 
+ The scheduler creates a new thread for each shard (assigned to this consumer instance), which continuously loops to read records from the data stream. It then invokes the `StockTradeRecordProcessor` instance to process each batch of records received. 

**StockTradeRecordProcessor class**  
Implementation of the `StockTradeRecordProcessor` instance, which in turn implements five required methods: `initialize`, `processRecords`, `leaseLost`, `shardEnded`, and `shutdownRequested`.   
The `initialize` and `shutdownRequested` methods are used by the KCL to let the record processor know when it should be ready to start receiving records and when it should expect to stop receiving records, respectively, so it can perform any application-specific setup and termination tasks. `leaseLost` and `shardEnded` are used to implement any logic for what to do when a lease is lost or a processing has reached the end of a shard. In this example, we simply log messages indicating these events.   
The code for these methods is provided for you. The main processing happens in the `processRecords` method, which in turn uses `processRecord` for each record. This latter method is provided as the mostly empty skeleton code for you to implement in the next step, where it is explained in greater detail.   
Also of note is the implementation of the support methods for `processRecord`: `reportStats`, and `resetStats`, which are empty in the original source code.   
The `processRecords` method is implemented for you, and performs the following steps:  
+ For each record passed in, it calls `processRecord` on it. 
+ If at least 1 minute has elapsed since the last report, calls `reportStats()`, which prints out the latest stats, and then `resetStats()` which clears the stats so that the next interval includes only new records.
+ Sets the next reporting time.
+ If at least 1 minute has elapsed since the last checkpoint, calls `checkpoint()`. 
+ Sets the next checkpointing time.
This method uses 60-second intervals for the reporting and checkpointing rate. For more information about checkpointing, see [Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats class**  
This class provides data retention and statistics tracking for the most popular stocks over time. This code is provided for you and contains the following methods:  
+ `addStockTrade(StockTrade)`: injects the given `StockTrade` into the running statistics.
+ `toString()`: returns the statistics in a formatted string.
This class tracks the most popular stock by keeping a running count of the total number of trades for each stock and the maximum count. It updates these counts whenever a stock trade arrives.

Add code to the methods of the `StockTradeRecordProcessor` class, as shown in the following steps. 

**To implement the consumer**

1. Implement the `processRecord` method by instantiating a correctly sized `StockTrade` object and adding the record data to it, logging a warning if there's a problem. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implement a `reportStats` method. Modify the output format to suit your preferences. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implement the `resetStats` method, which creates a new `stockStats` instance. 

   ```
   stockStats = new StockStats();
   ```

1. Implement the following methods required by `ShardRecordProcessor` interface:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**To run the consumer**

1. Run the producer that you wrote in [[Implement the producer](tutorial-stock-data-kplkcl2-producer.md)Implement the producer](tutorial-stock-data-kplkcl2-producer.md) to inject simulated stock trade records into your stream.

1. Verify that the access key and secret key pair retrieved earlier (when creating the IAM user) are saved in the file `~/.aws/credentials`. 

1. Run the `StockTradesProcessor` class with the following arguments:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Note that if you created your stream in a Region other than `us-west-2`, you have to specify that Region here instead.

After a minute, you should see output like the following, refreshed every minute thereafter:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Next steps
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Optional) Extend the consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Optional) Extend the consumer
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

This optional section shows how you can extend the consumer code for a slightly more elaborate scenario.

If you want to know about the biggest sell orders each minute, you can modify the `StockStats` class in three places to accommodate this new priority.

**To extend the consumer**

1. Add new instance variables:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Add the following code to `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modify the `toString` method to print the additional information:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

If you run the consumer now (remember to run the producer also), you should see output similar to this:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Next steps
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Clean up resources](tutorial-stock-data-kplkcl2-finish.md)

# Clean up resources
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Because you are paying to use the Kinesis data stream, make sure that you delete it and the corresponding Amazon DynamoDB table when you are done with it. Nominal charges occur on an active stream even when you aren't sending and getting records. This is because an active stream is using resources by continuously "listening" for incoming records and requests to get records.

**To delete the stream and table**

1. Shut down any producers and consumers that you might still have running.

1. Open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose the stream that you created for this application (`StockTradeStream`).

1. Choose **Delete Stream**.

1. Open the DynamoDB console at [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. Delete the `StockTradesProcessor` table.

## Summary
<a name="tutorial-stock-data-kplkcl2-summary"></a>

Processing a large amount of data in near real time doesn’t require writing complicated code or developing a huge infrastructure. It is as basic as writing logic to process a small amount of data (like writing `processRecord(Record)`) but using Kinesis Data Streams to scale so that it works for a large amount of streaming data. You don’t have to worry about how your processing would scale because Kinesis Data Streams handles it for you. All you have to do is send your streaming records to Kinesis Data Streams and write the logic to process each new record received. 

Here are some potential enhancements for this application.

**Aggregate across all shards**  
Currently, you get stats resulting from aggregation of the data records received by a single worker from a single shard. (A shard cannot be processed by more than one worker in a single application at the same time.) Of course, when you scale and have more than one shard, you might want to aggregate across all shards. You can do this by having a pipeline architecture where the output of each worker is fed into another stream with a single shard, which is processed by a worker that aggregates the outputs of the first stage. Because the data from the first stage is limited (one sample per minute per shard), it would easily be handled by one shard.

**Scale processing**  
When the stream scales up to have many shards (because many producers are sending data), the way to scale the processing is to add more workers. You can run the workers in Amazon EC2 instances and use Auto Scaling groups.

**Use connectors to Amazon S3/DynamoDB/Amazon Redshift/Storm**  
As a stream is continuously processed, its output can be sent to other destinations. AWS provides [connectors](https://github.com/awslabs/amazon-kinesis-connectors) to integrate Kinesis Data Streams with other AWS services and third-party tools.