

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 开发具有共享吞吐量的自定义消费端
<a name="shared-throughput-consumers"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

如果您在从 Kinesis Data Streams 接收数据时不需要专用吞吐量，并且在 200 毫秒下不需要读取传播延迟，则可以构建消费端应用程序，如以下主题所述。您可以使用 Kinesis Client Library（KCL）或 适用于 Java 的 AWS SDK。

**Topics**
+ [使用 KCL 开发具有共享吞吐量的自定义消费端](custom-kcl-consumers.md)

有关使用专用吞吐量构建可从 Kinesis Data Streams 接收记录的消费端的信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

# 使用 KCL 开发具有共享吞吐量的自定义消费端
<a name="custom-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

开发具有共享吞吐量的定制消费端应用程序的方法之一是使用 Kinesis Client Library（KCL）。

根据您正在使用的 KCL 版本，从以下主题进行选择。

**Topics**
+ [开发 KCL 1.x 消费端](developing-consumers-with-kcl.md)
+ [开发 KCL 2.x 消费端](developing-consumers-with-kcl-v2.md)

# 开发 KCL 1.x 消费端
<a name="developing-consumers-with-kcl"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library（KCL）为 Amazon Kinesis Data Streams 开发消费端应用程序。

有关 KCL 的更多信息，请参阅[关于 KCL（先前版本）](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview)。

根据要使用的选项，从以下主题进行选择。

**Topics**
+ [在 Java 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-java.md)
+ [在 Node.js 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-nodejs.md)
+ [在 .NET 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-dotnet.md)
+ [在 Python 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-py.md)
+ [在 Ruby 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-ruby.md)

# 在 Java 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-java"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Java。要查看 Javadoc 参考资料，请参阅类的 [AWS Javadoc 主题](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)。 AmazonKinesisClient

要从中下载 Java KCL GitHub，请前往 K [inesis 客户端库 (](https://github.com/awslabs/amazon-kinesis-client)Java)。要查找 Apache Maven 上的 Java KCL，请转至 [KCL 搜索结果](https://search.maven.org/#search|ga|1|amazon-kinesis-client)页。要从中下载 Java KCL 使用者应用程序的示例代码 GitHub，请转到上的 [KCL for Java 示例项目](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis)页面。 GitHub

该示例应用程序使用 [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html)。可以使用 `configure` 文件中定义的静态 `AmazonKinesisApplicationSample.java` 方法更改日志记录配置。*有关如何在 Log4j 和 AWS Java 应用程序中使用 Apache 共享日志记录的更多信息，请参阅《开发人员指南》中的使用 [Log4j 进行日志记录](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)。适用于 Java 的 AWS SDK *

在 Java 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 IRecord处理器方法](#kinesis-record-processor-implementation-interface-java)
+ [为 IRecord处理器接口实现类工厂](#kinesis-record-processor-implementation-factory-java)
+ [创建工作线程](#kcl-java-worker)
+ [修改配置属性](#kinesis-record-processor-initialization-java)
+ [迁移到版本 2 的记录处理器接口](#kcl-java-v2-migration)

## 实现 IRecord处理器方法
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL 当前支持 `IRecordProcessor` 接口的两个版本：原始接口可与第一个版本的 KCL 一起可用；而版本 2 从 KCL 1.5.0 版才开始可用。这两个接口都完全受支持。您的选择取决于您的特定方案要求。要查看所有区别，请参阅您在本地构建的 Javadocs 或源代码。以下各节概述了开始使用的最低实施要求。

**Topics**
+ [原始接口（版本 1）](#kcl-java-interface-original)
+ [更新后的接口（版本 2）](#kcl-java-interface-v2)

### 原始接口（版本 1）
<a name="kcl-java-interface-original"></a>

原始 `IRecordProcessor` 接口 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) 公开了下列记录处理器方法，您的消费端必须实施这些方法。该示例提供了可用作起点的实现（请参阅 `AmazonKinesisApplicationSampleRecordProcessor.java`）。

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**初始化**  
KCL 在实例化记录处理器时调用 `initialize` 方法，并将特定分片 ID 作为参数传递。此记录处理器仅处理此分片，并且通常情况下反过来说也成立（此分片仅由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录至少会由消费端中的工作程序处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
public void initialize(String shardId)
```

**processRecords**  
KCL 调用 `processRecords` 方法，并传递来自由 `initialize(shardId)` 方法指定的分片的数据记录的列表。记录处理器根据消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

除了数据本身之外，记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`Record` 类公开了以下方法，这些方法提供对记录的数据、序列号和分区键的访问。

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

在该示例中，私有方法 `processRecordsWithRetries` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将检查指针（`IRecordProcessorCheckpointer`）传递到 `processRecords` 来为您执行此跟踪。记录处理器将对此接口调用 `checkpoint` 方法，以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未传递参数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器只应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `processRecords`。例如，处理器可以在每第三次调用 `checkpoint` 时调用 `processRecords`。您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

在该示例中，私有方法 `checkpoint` 展示了如何使用适当的异常处理和重试逻辑调用 `IRecordProcessorCheckpointer.checkpoint`。

KCL 依靠 `processRecords` 来处理由处理数据记录引起的任何异常。如果 `processRecords` 引发了异常，则 KCL 将跳过在异常发生前已传递的数据记录。也就是说，这些记录不会重新发送到引发异常的记录处理器或消费端中的任何其他记录处理器。

**shutdown**  
KCL 在处理结束（关闭原因为 `TERMINATE`）或工作程序不再响应（关闭原因为 `ZOMBIE`）时调用 `shutdown` 方法。

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `IRecordProcessorCheckpointer` 接口传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则记录处理器应完成处理任何数据记录，然后对此接口调用 `checkpoint` 方法。

### 更新后的接口（版本 2）
<a name="kcl-java-interface-v2"></a>

更新后的 `IRecordProcessor` 接口 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) 公开了下列记录处理器方法，您的消费端必须实施这些方法：

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

原始版本的接口中的所有参数可通过容器对象上的 get 方法进行访问。例如，要检索 `processRecords()` 中的记录的列表，可使用 `processRecordsInput.getRecords()`。

自此接口的版本 2（KCL 1.5.0 及更高版本）起，除了原始接口提供的输入之外，以下新输入也可用：

起始序列号  
在传递给 `InitializationInput` 运算的 `initialize()` 对象中，将提供给记录处理器实例的记录的起始序列号。这是由之前处理同一分片的记录处理器实例进行最近一次检查点操作的序列号。此序列号在您的应用程序需要此信息时提供。

待进行检查点操作的序列号  
在传递给 `initialize()` 运算的 `InitializationInput` 对象中，在上一个记录处理器实例停止前可能无法提交的待进行检查点操作的序列号（如果有）。

## 为 IRecord处理器接口实现类工厂
<a name="kinesis-record-processor-implementation-factory-java"></a>

您还需要为实现记录处理器方法的类实现一个工厂。当消费端实例化工作程序时，它将传递对此工厂的引用。

以下示例使用原始记录处理器接口在文件 `AmazonKinesisApplicationSampleRecordProcessorFactory.java` 中实现工厂类。如果您希望此工厂类创建版本 2 记录处理器，请使用程序包名称 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`。

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## 创建工作线程
<a name="kcl-java-worker"></a>

如 [实现 IRecord处理器方法](#kinesis-record-processor-implementation-interface-java) 中所述，有两个版本的 KCL 记录处理器接口可供选择，这将影响您创建工作程序的方式。原始记录处理器接口使用以下代码结构创建工作线程：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

当使用版本 2 的记录处理器接口时，您可使用 `Worker.Builder` 创建工作线程而无需担心要使用的构造函数以及参数的顺序。更新后的记录处理器接口使用以下代码结构创建工作线程：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 修改配置属性
<a name="kinesis-record-processor-initialization-java"></a>

该示例提供了配置属性的默认值。工作程序的此配置数据随后将整合到 `KinesisClientLibConfiguration` 对象中。此对象和对 `IRecordProcessor` 的类工厂的引用将传入用于实例化工作程序的调用。您可借助 Java 属性文件（请参阅 `AmazonKinesisApplicationSample.java`）用您自己的值覆盖任何这些属性。

### 应用程序名称
<a name="configuration-property-application-name"></a>

KCL 需要一个应用程序名称，该名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-cred-java"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。例如，如果您在 EC2 实例上运行消费端，则我们建议您使用 IAM 角色启动实例。反映与此 IAM 角色关联的权限的 AWS 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端的凭证最安全。

示例应用程序首先尝试从实例元数据中检索 IAM 凭证：

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

如果示例应用程序无法从实例元数据中获取凭证，它会尝试从属性文件中检索凭证：

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

有关实例元数据的更多信息，请参阅《Amazon EC2 用户指南》**中的[实例元数据](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)。

### 将工作线程 ID 用于多个实例
<a name="kinesis-record-processor-workerid-java"></a>

示例初始化代码通过使用本地计算机的名称并附加一个全局唯一的标识符为工作程序创建 ID (`workerId`)，如以下代码段所示。此方法支持消费端应用程序的多个实例在单台计算机上运行的方案。

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## 迁移到版本 2 的记录处理器接口
<a name="kcl-java-v2-migration"></a>

如果要迁移使用原始接口的代码，则除了上述步骤之外，还需要执行以下步骤：

1. 更改您的记录处理器类以导入版本 2 记录处理器接口：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. 更改对输入的引用以在容器对象上使用 `get` 方法。例如，在 `shutdown()` 运算中，将“`checkpointer`”更改为“`shutdownInput.getCheckpointer()`”。

1. 更改您的记录处理器工厂类以导入版本 2 记录处理器工厂接口：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. 更改工作线程的结构以使用 `Worker.Builder`。例如：

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# 在 Node.js 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Node.js。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果你安装适用于 Node.js 的 KCL 并完全用 Node.js 编写消费者应用程序，那么你仍然需要在系统上安装 Java，因为. MultiLangDaemon 此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Node.js KCL GitHub，请前往 K [inesis 客户端库 (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs)。

**示例代码下载**

Node.js 中有两个代码示例可用于 KCL：
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  在下列节中用于阐释在 Node.js 中构建 KCL 消费端应用程序的基础知识。
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   稍微复杂一些，使用了现实世界的情景，适合在您熟悉基本示例代码之后采用。此示例在这里不做讨论，但它有一个包含更多信息的自述文件。

在 Node.js 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现记录处理器](#kinesis-record-processor-implementation-interface-nodejs)
+ [修改配置属性](#kinesis-record-processor-initialization-nodejs)

## 实现记录处理器
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

使用适用于 Node.js 的 KCL 的最简易的潜在消费端必须实现 `recordProcessor` 函数，该函数反之包含函数 `initialize`、`processRecords` 和 `shutdown`。该示例提供了可用作起点的实现（请参阅 `sample_kcl_app.js`）。

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**初始化**  
KCL 在记录处理器启动时调用 `initialize` 函数。此记录处理器只处理作为 `initializeInput.shardId` 传递的分片 ID，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL 使用包含一个数据记录的列表（这些记录来自在 `initialize` 函数中指定的分片）的输入来调用此函数。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
processRecords: function(processRecordsInput, completeCallback)
```

除了数据本身之外，记录还包含工作程序在处理数据时可使用的序号和分区键。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`record` 词典公开了以下键-值对来访问记录的数据、序号和分区键：

```
record.data
record.sequenceNumber
record.partitionKey
```

请注意，数据是 Base64 编码的。

在该基本示例中，函数 `processRecords` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 利用作为 `processRecordsInput.checkpointer` 传递的 `checkpointer` 对象执行此跟踪。您的记录处理器将调用 `checkpointer.checkpoint` 函数以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将在您重新启动分片的处理时使用此信息，以便在上一个已知的已处理记录处继续处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未将序列号传递到 `checkpoint` 函数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器**只**应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `processRecords`。例如，处理器可以调`checkpoint`用每三次呼叫或记录处理器外部的某个事件（例如您实现的自定义 verification/validation 服务）。

您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

基本示例应用程序显示了对 `checkpointer.checkpoint` 函数最简单的调用。您此时可以在该函数中为您的消费端添加您需要的其他检查点逻辑。

**shutdown**  
KCL 在处理结束（`shutdownInput.reason` 为 `TERMINATE`）或工作程序不再响应（`shutdownInput.reason` 为 `ZOMBIE`）时调用 `shutdown` 函数。

```
shutdown: function(shutdownInput, completeCallback)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `shutdownInput.checkpointer` 对象传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则应确保记录处理器已完成处理任何数据记录，然后对此接口调用 `checkpoint` 函数。

## 修改配置属性
<a name="kinesis-record-processor-initialization-nodejs"></a>

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅基本示例中的 `sample.properties`）。

### 应用程序名称
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL 需要一个应用程序，该应用程序在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-credentials-nodejs"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。`sample.properties` 文件必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行使用器，我们建议您使用 IAM 角色配置该实例。 AWS 反映与此 IAM 角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

以下示例配置 KCL 以使用 `sample_kcl_app.js` 中提供的记录处理器，处理名为 `kclnodejssample` 的 Kinesis 数据流。

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# 在 .NET 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 .NET。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于.NET 的 KCL 并完全使用.NET 编写使用者应用程序，则仍然需要在系统上安装 Java，因为。 MultiLangDaemon此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载.NET KCL GitHub，请访问 K [inesis 客户端库 (](https://github.com/awslabs/amazon-kinesis-client-net).NET)。要下载.NET KCL 使用者应用程序的示例代码，请转到上的 [KCL for .NET 使用者项目示例](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer)页面。 GitHub

在 .NET 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 IRecord处理器类方法](#kinesis-record-processor-implementation-interface-dotnet)
+ [修改配置属性](#kinesis-record-processor-initialization-dotnet)

## 实现 IRecord处理器类方法
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

消费端必须实现适用于 `IRecordProcessor` 的以下方法。示例消费端提供了可用作起点的实现（请参阅 `SampleRecordProcessor` 中的 `SampleConsumer/AmazonKinesisSampleConsumer.cs` 类）。

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**初始化**  
KCL 在实例化记录处理程序时调用此方法，并将特定分片 ID 传入 `input` 参数（`input.ShardId`）。此记录处理器只处理此分片，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL 调用此方法，并将由 `Initialize` 方法指定的分片中的数据记录的列表传入 `input` 参数（`input.Records`）。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
public void ProcessRecords(ProcessRecordsInput input)
```

除了数据本身之外，记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`Record` 类公开了以下代理来访问记录的数据、序号和分区键：

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

在该示例中，方法 `ProcessRecordsWithRetries` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将 `Checkpointer` 对象传递到 `ProcessRecords`（`input.Checkpointer`）来为您执行此跟踪。记录处理器将调用 `Checkpointer.Checkpoint` 方法以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作，在原始分片的处理器调用 `Checkpointer.Checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未传递参数，KCL 将假定对 `Checkpointer.Checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器只应在已处理传递到它的列表中的所有记录后才调用 `Checkpointer.Checkpoint`。记录处理器不需要在每次调用 `Checkpointer.Checkpoint` 时调用 `ProcessRecords`。例如，处理器在每第三次或第四次调用时调用 `Checkpointer.Checkpoint`。您可以选择性地将某个记录的确切序号指定为 `Checkpointer.Checkpoint` 的参数。在本例中，KCL 将假定记录都已处理，直至处理到该记录。

在该示例中，私有方法 `Checkpoint(Checkpointer checkpointer)` 展示了如何使用适当的异常处理和重试逻辑调用 `Checkpointer.Checkpoint` 方法。

适用于 .NET 的 KCL 处理异常的方式不同于其他 KCL 语言库，前者不处理因处理数据记录而引起的任何异常。用户代码中未捕获的任何异常都将使程序崩溃。

**关闭**  
KCL 在处理结束（关闭原因为 `TERMINATE`）或工作程序不再响应（关闭 `input.Reason` 值为 `ZOMBIE`）时调用 `Shutdown` 方法。

```
public void Shutdown(ShutdownInput input)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `Checkpointer` 对象传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则记录处理器应完成处理任何数据记录，然后对此接口调用 `checkpoint` 方法。

## 修改配置属性
<a name="kinesis-record-processor-initialization-dotnet"></a>

示例消费端提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅 `SampleConsumer/kcl.properties`）。

### 应用程序名称
<a name="modify-kinesis-record-processor-application-name"></a>

KCL 需要一个应用程序，该应用程序在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-creds-dotnet"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 EC2 实例上运行消费端应用程序，则建议您使用 IAM 角色进行配置。反映与此 IAM 角色关联的权限 AWS 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端的凭证最安全。

该示例的属性文件将配置 KCL 以使用 `AmazonKinesisSampleConsumer.cs` 中提供的记录处理器处理名为“words”的 Kinesis 数据流。

# 在 Python 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Python。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于 Python 的 KCL 并完全使用 Python 编写使用者应用程序，则仍然需要在系统上安装 Java，因为。 MultiLangDaemon此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Python KCL GitHub，请前往 K [inesis 客户端库 (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。要下载 Python KCL 使用者应用程序的示例代码，请转到上的 [KCL for Python 示例项目](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)页面。 GitHub

在 Python 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 RecordProcessor 类方法](#kinesis-record-processor-implementation-interface-py)
+ [修改配置属性](#kinesis-record-processor-initialization-py)

## 实现 RecordProcessor 类方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 类必须扩展 `RecordProcessorBase` 以实现以下方法。该示例提供了可用作起点的实现（请参阅 `sample_kclpy_app.py`）。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**初始化**  
KCL 在实例化记录处理器时调用 `initialize` 方法，并将特定分片 ID 作为参数传递。此记录处理器只处理此分片，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL 调用此方法，并传递由 `initialize` 方法指定的分片中的数据记录的列表。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
def process_records(self, records, checkpointer) 
```

除了数据本身之外，记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`record` 词典公开了以下键-值对来访问记录的数据、序号和分区键：

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

请注意，数据是 Base64 编码的。

在该示例中，方法 `process_records` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将 `Checkpointer` 对象传递到 `process_records` 来为您执行此跟踪。记录处理器将对此对象调用 `checkpoint` 方法，以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未传递参数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器只应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `process_records`。例如，处理器可在每第三次调用时调用 `checkpoint`。您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

在该示例中，私有方法 `checkpoint` 展示了如何使用适当的异常处理和重试逻辑调用 `Checkpointer.checkpoint` 方法。

KCL 依靠 `process_records` 来处理由处理数据记录引起的任何异常。如果 `process_records` 引发了异常，则 KCL 将跳过在异常发生前已传递到 `process_records` 的数据记录。也就是说，这些记录不会重新发送到引发异常的记录处理器或消费端中的任何其他记录处理器。

**shutdown**  
 KCL 在处理结束（关闭原因为 `TERMINATE`）或工作程序不再响应（关闭 `reason` 为 `ZOMBIE`）时调用 `shutdown` 方法。

```
def shutdown(self, checkpointer, reason)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

 KCL 还会将 `Checkpointer` 对象传递到 `shutdown`。如果关闭 `reason` 是 `TERMINATE`，则记录处理器应完成处理任何数据记录，然后对此接口调用 `checkpoint` 方法。

## 修改配置属性
<a name="kinesis-record-processor-initialization-py"></a>

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅 `sample.properties`）。

### 应用程序名称
<a name="kinesis-record-processor-application-name-py"></a>

KCL 需要一个应用程序名称，该名称在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定与此应用程序名称关联的所有工作线程在同一个流上一起运行。这些工作线程可分布在多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-creds-py"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行消费端应用程序，则建议您使用 IAM 角色进行配置。反映与此 IAM 角色关联的权限 AWS 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

该示例的属性文件将配置 KCL 以使用 `sample_kclpy_app.py` 中提供的记录处理器处理名为“words”的 Kinesis 数据流。

# 在 Ruby 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Ruby。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于 Ruby 的 KCL 并完全使用 Ruby 编写消费者应用程序，则仍然需要在系统上安装 Java，因为. MultiLangDaemon 此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Ruby KCL GitHub，请前往 K [inesis 客户端库 (](https://github.com/awslabs/amazon-kinesis-client-ruby)Ruby)。要下载 Ruby KCL 使用者应用程序的示例代码，请转到上的 [KCL for Ruby 示例项目](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples)页面。 GitHub

有关 KCL Ruby 支持库的更多信息，请参阅 [KCL Ruby Gems Documentation](http://www.rubydoc.info/gems/aws-kclrb)。

# 开发 KCL 2.x 消费端
<a name="developing-consumers-with-kcl-v2"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

本主题将说明如何使用 2.0 版本的 Kinesis Client Library（KCL）。

有关 KCL 的更多信息，请参阅 [Developing Consumers Using the Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html) 中提供的概述。

根据要使用的选项，从以下主题进行选择。

**Topics**
+ [在 Java 中开发 Kinesis Client Library 消费端](kcl2-standard-consumer-java-example.md)
+ [在 Python 中开发 Kinesis Client Library 消费端](kcl2-standard-consumer-python-example.md)
+ [使用 KCL 2.x 开发具有增强扇出功能的消费端](building-enhanced-consumers-kcl-retired.md)

# 在 Java 中开发 Kinesis Client Library 消费端
<a name="kcl2-standard-consumer-java-example"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

以下代码显示 `ProcessorFactory` 和 `RecordProcessor` 在 Java 中的实施示例。如果要利用增强型扇出功能，请参阅[利用使用增强型扇出功能的用户](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html)。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# 在 Python 中开发 Kinesis Client Library 消费端
<a name="kcl2-standard-consumer-python-example"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Python。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于 Python 的 KCL 并完全使用 Python 编写使用者应用程序，则仍然需要在系统上安装 Java，因为。 MultiLangDaemon此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Python KCL GitHub，请前往 K [inesis 客户端库 (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。要下载 Python KCL 使用者应用程序的示例代码，请转到上的 [KCL for Python 示例项目](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)页面。 GitHub

在 Python 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 RecordProcessor 类方法](#kinesis-record-processor-implementation-interface-py)
+ [修改配置属性](#kinesis-record-processor-initialization-py)

## 实现 RecordProcessor 类方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 类必须扩展 `RecordProcessorBase` 类以实现以下方法：

```
initialize
process_records
shutdown_requested
```

此示例提供了可用作起点的实现。

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## 修改配置属性
<a name="kinesis-record-processor-initialization-py"></a>

该示例提供了配置属性的默认值，如以下脚本所示。您可使用自己的值覆盖任何这些属性。

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### 应用程序名称
<a name="kinesis-record-processor-application-name-py"></a>

KCL 需要一个应用程序名称，该名称在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定与此应用程序名称关联的所有工作线程在同一个流上一起运行。这些工作线程可分布在多个实例中。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 凭据
<a name="kinesis-record-processor-creds-py"></a>

您必须将您的 AWS 证书提供给[默认凭证提供者链中的一个凭证提供商](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。如果您在 Amazon EC2 实例上运行使用者应用程序，我们建议您使用 IAM 角色配置该实例。 AWS 反映与此 IAM 角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

# 使用 KCL 2.x 开发具有增强扇出功能的消费端
<a name="building-enhanced-consumers-kcl-retired"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

在 Amazon Kinesis Data Streams 中使用*增强型扇出*功能的消费端，可以接收数据流中的记录，其中每分片每秒专用吞吐量高达 2MB 数据。此类消费端不必与接收流中数据的其他消费端争夺。有关更多信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

可以使用 2.0 版或更高版本的 Kinesis Client Library（KCL）开发使用增强型扇出功能接收流中数据的应用程序。KCL 会自动为您的应用程序订阅流的所有分片，并确保您的使用者应用程序可以读取每个分片的吞吐量值 2 MB/sec 。如果要在未开启增强型扇出功能的情况下使用 KCL，请参阅 [Developing Consumers Using the Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html)。

**Topics**
+ [使用 KCL 2.x 在 Java 中开发具有增强扇出功能的消费端](building-enhanced-consumers-kcl-java.md)

# 使用 KCL 2.x 在 Java 中开发具有增强扇出功能的消费端
<a name="building-enhanced-consumers-kcl-java"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 2.0 版或更高版本的 Kinesis Client Library（KCL），在 Amazon Kinesis Data Streams 中开发使用增强型扇出功能接收流中数据的应用程序。以下代码显示 `ProcessorFactory` 和 `RecordProcessor` 在 Java 中的实施示例。

建议您使用 `KinesisClientUtil` 创建 `KinesisAsyncClient`，并在 `KinesisAsyncClient` 中配置 `maxConcurrency`。

**重要**  
Amazon Kinesis 户端可能会看到延迟大幅增加，除非您将 `KinesisAsyncClient` 配置为具有足够高的 `maxConcurrency`，以允许所有租期以及额外使用 `KinesisAsyncClient`。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```