

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 教程：使用 KPL 和 KCL 2.x 处理实时股票数据


本教程的场景涉及将股票交易引入数据流中并编写对流执行计算的基础 Amazon Kinesis Data Streams 应用程序。您将了解如何将记录流发送到 Kinesis Data Streams 并实现近乎实时地使用和处理记录的应用程序。

**重要**  
创建直播后，您的账户会因使用 Kinesis Data Streams 而产生象征性的费用，因为 Kinesis Data Streams 不符合 AWS 免费套餐的资格。在消费端应用程序启动后，也会象征性收取 Amazon DynamoDB 使用费用。消费端应用程序使用 DynamoDB 跟踪处理状态。在使用完此应用程序后，请删除 AWS 资源以停止产生费用。有关更多信息，请参阅 [清理 资源](tutorial-stock-data-kplkcl2-finish.md)。

代码不访问实际股票市场数据，而是模拟股票交易流。它通过使用随机股票交易生成器（将截至 2015 年 2 月市值排名前 25 位的股票的实际市场数据作为起始点）来执行此操作。如果您有权访问实时的股票交易流，则可能有兴趣从该流派生有用且及时的统计数据。例如，您可能希望执行滑动窗口分析，从而确定前 5 分钟内购买的最热门股票。或者，您可能希望在销售订单过大（即具有过多股份）时收到通知。可以扩展此系列代码以提供此类功能。

您可以在台式计算机或笔记本电脑上演练本教程中的步骤，然后在同一台计算机或支持已定义要求的任何平台上同时运行创建器和消费端代码。

显示的示例使用的是美国西部（俄勒冈州）区域，但它们适用于[支持 Kinesis Data Streams 的任何AWS 区域](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)。

**Topics**
+ [

# 满足先决条件
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# 创建数据流
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# 创建 IAM 策略和用户
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# 下载并构建代码
](tutorial-stock-data-kplkcl2-download.md)
+ [

# 实现产生器
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# 实现消费端
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# （可选）扩展消费端
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# 清理 资源
](tutorial-stock-data-kplkcl2-finish.md)

# 满足先决条件


您必须满足以下要求才能完成本教程：

## 创建和使用亚马逊云科技账户


在开始之前，请确保熟悉 [Amazon Kinesis Data Streams 术语和概念](key-concepts.md) 中讨论的概念，特别是流、分片、产生器和消费端。完成以下指南中的步骤也很有帮助：[教程：为 Kinesis Data AWS CLI Streams 安装和配置](kinesis-tutorial-cli-installation.md)。

您必须拥有一个 AWS 帐户和一个网络浏览器才能访问 AWS 管理控制台.

要访问控制台，请使用您的 IAM 用户名和密码从 IAM 登录页面登录 [AWS 管理控制台](https://console.aws.amazon.com/console/home)。有关 AWS 安全证书的信息，包括编程访问权限和长期证书的替代方案，请参阅 *IAM 用户指南*中的[AWS 安全证书](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)。有关登录您的的详细信息 AWS 账户，请参阅*《AWS 登录 用户指南》 AWS*[中的如何登录](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)。

有关 IAM 和安全密钥设置说明的更多信息，请参阅[创建 IAM 用户](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user)。

## 满足系统软件要求


用于运行应用程序的系统必须已安装 Java 7 或更高版本。要下载和安装最新 Java 开发工具包 (JDK)，请转到 [Oracle 的 Java SE 安装站点](http://www.oracle.com/technetwork/java/javase/downloads/index.html)。

您需要最新的 [适用于 Java 的 AWS SDK](https://aws.amazon.com/sdk-for-java/) 版本。

[消费者应用程序需要 Kinesis 客户端库 (KCL) 2.2.9 或更高版本，您可以从 /tree/master 上获取该版本。 GitHub https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## 后续步骤


[创建数据流](tutorial-stock-data-kplkcl2-create-stream.md)

# 创建数据流


首先，您必须创建将在本教程的后续步骤中使用的数据流。

**创建流**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 在导航栏中，展开区域选择器并选择一个区域。

1. 选择 **Create Kinesis stream (创建 Kinesis 流)**。

1. 输入数据流的名称（例如，**StockTradeStream**）。

1. 在分片数量中输入 **1**，但保留**估计您需要的分片数量**为折叠状态。

1. 选择 **Create Kinesis stream (创建 Kinesis 流)**。

在 **Kinesis 流**列表页面上，流状态在创建流的过程中显示为 `CREATING`。当流可以使用时，状态会更改为 `ACTIVE`。

如果您选择流的名称，在显示的页面中，**Details (详细信息)** 选项卡会显示数据流的配置摘要。**Monitoring (监控)** 部分显示流的监控信息。

## 后续步骤


[创建 IAM 策略和用户](tutorial-stock-data-kplkcl2-iam.md)

# 创建 IAM 策略和用户


的安全最佳实践 AWS 要求使用细粒度的权限来控制对不同资源的访问权限。 AWS Identity and Access Management (IAM) 允许您在中管理用户和用户权限 AWS。[IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) 明确列出了允许的操作以及这些操作适用于的资源。

下面是 Kinesis Data Streams 创建器和消费端通常需要的最低权限。


**Producer**  

| 操作 | 资源 | 用途 | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis 数据流 | 在尝试读取记录前，消费端会检查数据流是否存在，数据流是否处于活动状态，以及分片是否包含在数据流中。 | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis 数据流 | 订阅消费端并将其注册到分片。 | 
| PutRecord, PutRecords | Kinesis 数据流 | 将记录写入 Kinesis Data Streams。 | 


**消费端**  

| **操作** | **资源** | **目的** | 
| --- | --- | --- | 
| DescribeStream | Kinesis 数据流 | 在尝试读取记录前，消费端会检查数据流是否存在，数据流是否处于活动状态，以及分片是否包含在数据流中。 | 
| GetRecords, GetShardIterator  | Kinesis 数据流 | 从分片读取记录。 | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB 表 | 如果消费端是使用 Kinesis Client Library（KCL）（版本 1.x 或 2.x）开发的，则需要 DynamoDB 表的权限才能跟踪应用程序的处理状态。 | 
| DeleteItem | Amazon DynamoDB 表 | 当使用者对 Kinesis Data Streams 分片执行 split/merge 操作时。 | 
| PutMetricData | 亚马逊 CloudWatch 日志 | KCL 还会将指标上传到 CloudWatch，这对于监控应用程序非常有用。 | 

对于此教程，您将创建授予上述所有权限的单个 IAM 策略。在生产中，您可能需要创建两个策略，一个针对创建器，另一个针对消费端。

**创建 IAM policy**

1. 找到您在上一步中创建的新数据流的 Amazon 资源名称（ARN）。您可以在**详细信息**选项卡顶部找到作为**流 ARN** 列出的此 ARN。ARN 格式如下所示：

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
 AWS 地区代码；例如，`us-west-2`。有关更多信息，请参阅[区域和可用区域概念](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones)。  
*账户*  
 AWS 账户 ID，如[账户设置](https://console.aws.amazon.com/billing/home?#/account)中所示。  
*name*  
您在上一步中创建的数据流的名称，即 `StockTradeStream`。

1. 确定要由消费端使用（并要由第一个消费端实例创建）的 DynamoDB 表的 ARN。它必须采用以下格式：

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   区域和账户 ID 与您在本教程中使用的数据流 ARN 中的值相同，但 *name* 是消费端应用程序创建并使用的 DynamoDB 表的名称。KCL 使用应用程序名称作为表名称。在此步骤中，将 `StockTradesProcessor` 用作 DynamoDB 表名称，因为这是本教程后续步骤中使用的应用程序名称。

1. 在 IAM 控制台的**策略** (h [https://console.aws.amazon.com/iam/ome \$1policies](https://console.aws.amazon.com/iam/home#policies)) 中，选择**创建策略**。如果这是您首次使用 IAM policy，请依次选择**开始使用**、**创建策略**。

1. 在 **Policy Generator** 旁，选择 **Select**。

1. 选择 **Amazon Kinesis** 作为服务。 AWS 

1. 选择 `DescribeStream`、`GetShardIterator`、`GetRecords`、`PutRecord` 和 `PutRecords` 作为允许的操作。

1. 输入您在本教程中使用的数据流的 ARN。

1. 对以下各项使用 **Add Statement (添加语句)**：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   在不需要指定 ARN 时使用的星号 (`*`)。在这种情况下，这是因为没有用于调用`PutMetricData`操作 CloudWatch 的特定资源。

1. 选择**下一步**。

1. 将 **Policy Name (策略名称)** 更改为 `StockTradeStreamPolicy`，审阅代码，然后选择 **Create Policy (创建策略)**。

生成的策略文档应该如下所示：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**若要创建 IAM 用户**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在 **Users (用户)** 页面上，选择 **Add user (添加用户)**。

1. 对于 **User name**，键入 `StockTradeStreamUser`。

1. 对于 **Access type (访问类型)**，选择 **Programmatic access (编程访问)**，然后选择 **Next: Permissions (下一步: 权限)**。

1. 选择**直接附上现有策略**。

1. 按名称搜索您在前面的过程中创建的策略 (`StockTradeStreamPolicy`)。选中策略名称左侧的框，然后选择 **Next: Review (下一步: 审核)**。

1. 查看详细信息和摘要，然后选择 **Create user (创建用户)**。

1. 复制 **Access key ID (访问密钥 ID)**，并将其私下保存。在 **Secret access key (私有访问密钥)** 下面选择 **Show (显示)**，然后也将该密钥私下保存。

1. 将访问密钥和私有密钥粘贴到一个只有您可以访问的位于安全位置的本地文件中。对于此应用程序，请创建名为 ` ~/.aws/credentials`（具有严格权限）的文件。该文件应采用以下格式：

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**将 IAM policy 附加到用户**

1. 在 IAM 控制台中打开[策略](https://console.aws.amazon.com/iam/home?#policies)，然后选择**策略操作**。

1. 选择 `StockTradeStreamPolicy` 和 **Attach (附加)**。

1. 选择 `StockTradeStreamUser` 和 **Attach Policy (附加策略)**。

## 后续步骤


[下载并构建代码](tutorial-stock-data-kplkcl2-download.md)

# 下载并构建代码


本主题提供了引入数据流（*创建器*）和处理此数据（*消费端*）的样本股票交易的示例实施代码。

**下载并构建代码**

1. 将源代码从 [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repo 下载到您的计算机。

1. 按照提供的目录结构，使用源代码在您的 IDE 中创建一个项目。

1. 将以下库添加到该项目中：
   + Amazon Kinesis Client Library（KCL）
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (适用于 Java 的 Google 核心库)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat：CBOR
   + Joda Time

1. 根据您的 IDE，项目可能会自动构建。如果未自动构建项目，请使用适合您的 IDE 的步骤构建项目。

如果已成功完成这些步骤，则可进入下一节 [实现产生器](tutorial-stock-data-kplkcl2-producer.md)。

## 后续步骤


[[实现产生器](tutorial-stock-data-kplkcl2-producer.md)实现产生器](tutorial-stock-data-kplkcl2-producer.md)

# 实现产生器


此教程使用股票市场交易监控的实际场景。以下准则简要说明了此场景如何映射到创建器及其支持的代码结构。

请参阅[源代码](https://github.com/aws-samples/amazon-kinesis-learning )并查看以下信息。

**StockTrade 班级**  
单次股票交易由一个 StockTrade 类实例表示。此实例包含一些属性，如股票代号、价格、股份数、交易类型（买入或卖出）以及唯一标识交易的 ID。将为您实现此类。

**流记录**  
流是一个记录序列。记录是 JSON 格式的 `StockTrade` 实例序列化。例如：  

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator 班级**  
StockTradeGenerator 有一个名为的方法`getRandomTrade()`，该方法每次被调用时都会返回一个新的随机生成的股票交易。将为您实现此类。

**StockTradesWriter 班级**  
创建器的 `main` 方法 StockTradesWriter 将持续检索随机交易，然后通过执行以下任务将该交易发送到 Kinesis Data Streams：  

1. 将数据流名称和区域名称作为输入读取。

1. 使用 `KinesisAsyncClientBuilder` 来设置区域、凭证和客户端配置。

1. 检查流是否存在且处于活动状态 (如果不是这样，它将退出并显示错误)。

1. 在连续循环中，会依次调用 `StockTradeGenerator.getRandomTrade()` 方法和 `sendStockTrade` 方法以便每 100 毫秒将交易发送到流一次。
`sendStockTrade` 类的 `StockTradesWriter` 方法具有以下代码：  

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

请参阅以下代码细分：
+ `PutRecord` API 需要一个字节数组，并且您必须将交易转换为 JSON 格式。此行代码将执行该操作：

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 您需要先创建新的 `PutRecordRequest` 实例（此示例中称为请求），然后才能发送交易。每个 `request` 均需要流名称、分区键和数据 Blob。

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  该示例使用股票代码作为将记录映射到特定分片的分区键。实际上，每个分片应具有数百或数千个分区键，以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息，请参阅 [将数据写入 Amazon Kinesis Data Streams](building-producers.md)。

  现在 `request` 已准备好发送到客户端（put 操作）：

  ```
     kinesisClient.putRecord(request).get();
  ```
+ 错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件：

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  在`put`操作周围添加 try/catch 方块：

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  这是因为 Kinesis 数据流 put 操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。建议您仔细考虑针对 `put` 操作的重试策略以避免数据丢失，例如使用重试。
+ 状态日志记录很有用，但它是可选的：

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
此处显示的创建器使用 Kinesis Data Streams API 单记录功能 `PutRecord`。实际上，如果单个创建者生成许多记录，则使用 `PutRecords` 的多记录功能并一次性发送批量记录通常会更有效。有关更多信息，请参阅 [将数据写入 Amazon Kinesis Data Streams](building-producers.md)。

**运行创建器**

1. 验证在[创建 IAM 策略和用户](tutorial-stock-data-kplkcl2-iam.md)中检索到的访问密钥和私有密钥对是否保存到文件 `~/.aws/credentials` 中。

1. 使用以下参数运行 `StockTradeWriter` 类：

   ```
   StockTradeStream us-west-2
   ```

   如果您在 `us-west-2` 之外的区域中创建流，则必须改为在此处指定该区域。

您应该可以看到类似于如下所示的输出内容：

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

您的股票交易现在正由 Kinesis Data Streams 摄取。

## 后续步骤


[实现消费端](tutorial-stock-data-kplkcl2-consumer.md)

# 实现消费端


本教程中的消费端应用程序持续处理您数据流中的股票交易。然后，它输出每分钟买入和卖出最多的股票。该应用程序基于 Kinesis Client Library（KCL）构建，后者需要完成对消费端应用程序常见的大量繁重工作。有关更多信息，请参阅 [KCL 1.x 和 2.x 信息](shared-throughput-kcl-consumers.md)。

请参阅源代码并查看以下信息。

**StockTradesProcessor 班级**  
为您提供的消费端的主类，它将执行以下任务：  
+ 读取作为参数传递的应用程序名称、数据流名称和区域名称。
+ 使用区域名称创建 `KinesisAsyncClient` 实例。
+ 创建一个 `StockTradeRecordProcessorFactory` 实例，该实例提供由 `ShardRecordProcessor` 实例实施的 `StockTradeRecordProcessor` 的实例。
+ 使用 `KinesisAsyncClient`、`StreamName`、`ApplicationName` 和 `StockTradeRecordProcessorFactory` 实例创建 `ConfigsBuilder` 实例。这对于创建具有默认值的所有配置非常有用。
+ 使用 `ConfigsBuilder` 实例创建一个 KCL 计划程序（以前在 KCL 版本 1.x 中称为 KCL 工作线程）。
+ 此计划程序为每个分片（已分配给此消费端实例）创建一个线程，以持续循环从数据量读取记录。之后，它调用 `StockTradeRecordProcessor` 实例以处理收到的每批记录。

**StockTradeRecordProcessor 班级**  
`StockTradeRecordProcessor` 实例的实施，该实例反过来将实施五个必需方法：`initialize`、`processRecords`、`leaseLost`、`shardEnded` 和 `shutdownRequested`。  
`initialize` 和 `shutdownRequested` 方法由 KCL 使用，旨在让记录处理器分别了解何时应准备好开始接收记录，以及何时应停止接收记录，因此该方法可以执行任何特定于应用程序的设置和终止任务。`leaseLost` 和 `shardEnded` 用于实施当租约丢失或处理达到分片末尾时需执行的操作的任何逻辑。在此示例中，我们只记录指示这些事件的消息。  
将为您提供这些方法的代码。`processRecords` 方法中进行的主要处理，该处理反过来对每条记录使用 `processRecord`。后一个方法作为大体为空的框架代码提供给您，以便您在下一步骤中实施，届时将更详细地对其进行说明。  
另外要注意的是对 `processRecord` 的支持方法 `reportStats` 和 `resetStats` 的实施，二者在初始源代码中为空。  
已为您实施 `processRecords` 方法，并执行了以下步骤：  
+ 对于传入的每条记录，它会对其调用 `processRecord`。
+ 如果自上一次报告以来已过去至少 1 分钟，请调用 `reportStats()`（它将打印出最新统计数据），然后调用 `resetStats()`（它将清除统计数据以便下一个间隔仅包含新记录）。
+ 设置下一次报告时间。
+ 如果自上一检查点以来已过去至少 1 分钟，请调用 `checkpoint()`。
+ 设置下一次检查点操作时间。
此方法使用 60 秒间隔作为报告和检查点操作比率。有关检查点操作的更多信息，请参阅 [Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)。

**StockStats 班级**  
此类提供一段时间内针对最热门股票的数据保留和统计数据跟踪。此代码已提供给您并包含以下方法：  
+ `addStockTrade(StockTrade)`：将给定的 `StockTrade` 注入正在使用的统计数据。
+ `toString()`：以格式化字符串形式返回统计数据。
此类跟踪最热门股票的方式是，保留每只股票的总交易数的连续计数和最大计数。每当股票交易达成时，它都会更新这些计数。

将代码添加到 `StockTradeRecordProcessor` 类的方法，如以下步骤中所示。

**实施消费端**

1. 通过实例化大小正确的 `processRecord` 对象并将记录数据添加到该对象来实施 `StockTrade` 方法，并在出现问题时记录警告。

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. 实现 `reportStats` 方法。根据个人喜好修改输出格式。

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. 实施 `resetStats` 方法，这将创建新的 `stockStats` 实例。

   ```
   stockStats = new StockStats();
   ```

1. 实现 `ShardRecordProcessor` 接口所需的以下方法：

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**运行消费端**

1. 运行您在 [[实现产生器](tutorial-stock-data-kplkcl2-producer.md)实现产生器](tutorial-stock-data-kplkcl2-producer.md) 中编写的创建者以将模拟股票交易记录引入流中。

1. 验证之前（在创建 IAM 用户时）检索到的访问密钥和私有密钥对是否保存到文件 `~/.aws/credentials` 中。

1. 使用以下参数运行 `StockTradesProcessor` 类：

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   请注意，如果您在 `us-west-2` 之外的区域中创建流，则必须改为在此处指定该区域。

1 分钟后，您应看到类似以下内容的输出，并且输出在此后每分钟刷新一次：

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## 后续步骤


[（可选）扩展消费端](tutorial-stock-data-kplkcl2-consumer-extension.md)

# （可选）扩展消费端


此可选部分演示如何针对更为复杂的场景扩展消费端代码。

如果要了解每分钟的最大销售订单数，可以修改三个位置的 `StockStats` 类以适应此新的优先级。

**扩展消费端**

1. 添加新实例变量：

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. 将以下代码添加到 `addStockTrade`：

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. 修改 `toString` 方法以打印其他信息：

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

如果您现在运行消费端（请记住同时运行创建器），则应看到类似于以下内容的输出：

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## 后续步骤


[清理 资源](tutorial-stock-data-kplkcl2-finish.md)

# 清理 资源


由于您需要付费使用 Kinesis 数据流，请确保在使用完后删除流和相应的 Amazon DynamoDB 表。即使您不发送和获取记录，活动流也会产生象征性的费用。这是因为活动流将持续“侦听”传入记录和获取记录的请求，这将耗用资源。

**删除流和表**

1. 关闭您可能仍在运行的任何产生器和消费端。

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 选择为此应用程序创建的流 (`StockTradeStream`)。

1. 选择 **Delete Stream (删除流)**。

1. 打开 DynamoDB 控制台，网址为。[https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. 删除 `StockTradesProcessor` 表。

## Summary


近乎实时处理大量数据不需要编写任何复杂代码或开发大型基础设施。这就像编写逻辑来处理少量数据（如编写 `processRecord(Record)`）一样简单，但要使用 Kinesis Data Streams 进行扩展，才能让该逻辑处理大量流数据。您无需担心处理的扩展方式，因为 Kinesis Data Streams 将为您完成这一工作。您只需将流记录发送到 Kinesis Data Streams 并编写用于处理收到的每条新记录的逻辑。

以下是针对此应用程序的一些可能的改进。

**跨所有分片进行聚合**  
当前，通过聚合单个分片中单个工作线程收到的数据记录来获得统计数据。（一个分片不能同时由单个应用程序中的多个工作线程处理。） 当然，当您扩展并具有多个分片时，可能希望跨所有分片聚合。可通过部署管道架构完成此操作。在该架构中，每个工作线程的输出都注入具有单个分片的另一个流，分片由聚合第一个阶段输出的工作线程处理。由于来自第一个阶段的数据是有限的（每分片每分钟一个示例），因此一个分片即可轻松处理它。

**缩放处理**  
当流进行扩展以包含多个分片（因为多个创建器正在发送数据）时，扩展处理的方式是添加更多工作程序。可以在 Amazon EC2 实例中运行工作程序并使用自动扩缩组。

**使用连接器连接亚马逊 S3/ DynamoDB/Amazon Redshift/Storm**  
在连续处理流时，其输出可以发送到其他目的地。 AWS 提供了[用于将 Kinesis Data Streams AWS 与其他服务和第三方工具集成的连接器](https://github.com/awslabs/amazon-kinesis-connectors)。