

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 開發具有共用輸送量的自訂消費者
<a name="shared-throughput-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

如果您不需要專用輸送量從 Kinesis Data Streams 接收資料，而且不需要低於 200 毫秒的讀取傳播延遲，則可依照以下主題所述建置取用者應用程式。您可以使用 Kinesis Client Library (KCL) 或 適用於 Java 的 AWS SDK。

**Topics**
+ [使用 KCL 開發具有共用輸送量的自訂消費者](custom-kcl-consumers.md)

如需如何建置可經由專用輸送量從 Kinesis 資料串流接收記錄的取用者相關資訊，請參閱 [開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。

# 使用 KCL 開發具有共用輸送量的自訂消費者
<a name="custom-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

開發具有共用輸送量的自訂取用者應用程式的方法之一是使用 Kinesis Client Library (KCL)。

針對您正在使用的 KCL 版本選擇下列主題。

**Topics**
+ [開發 KCL 1.x 消費者](developing-consumers-with-kcl.md)
+ [開發 KCL 2.x 消費者](developing-consumers-with-kcl-v2.md)

# 開發 KCL 1.x 消費者
<a name="developing-consumers-with-kcl"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 為 Amazon Kinesis Data Streams 開發取用者應用程式。

如需詳細資訊，請參閱[關於 KCL （先前版本）](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview)。

根據您要使用的選項，從下列主題中進行選擇。

**Topics**
+ [在 Java 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-java.md)
+ [在 Node.js 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-nodejs.md)
+ [在 .NET 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-dotnet.md)
+ [在 Python 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-py.md)
+ [在 Ruby 中開發 Kinesis Client Library 消費者](kinesis-record-processor-implementation-app-ruby.md)

# 在 Java 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Java。如要檢視 Javadoc 參考，請參閱 [AmazonKinesisClient 類別的AWS Javadoc 主題](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)。

若要從 GitHub 下載 Java KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client)。若要尋找 Apache Maven 上的 Java KCL，請前往 [KCL 搜尋結果](https://search.maven.org/#search|ga|1|amazon-kinesis-client)頁面。如需從 GitHub 下載 Java KCL 取用者應用程式的範本程式碼，請至 GitHub 前往[適用於 Java 的 KCL 範例專案](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis)頁面。

範例應用程式使用 [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html)。您可以從 `configure` 檔案中定義的靜態 `AmazonKinesisApplicationSample.java` 方法更改日誌記錄組態。如需如何使用 Apache Commons Logging 搭配 Log4j 和 AWS Java 應用程式的詳細資訊，請參閱《 *適用於 Java 的 AWS SDK 開發人員指南*》中的 [ Log4j 記錄](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)。

以 Java 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 IRecordProcessor 方法](#kinesis-record-processor-implementation-interface-java)
+ [實作 IRecordProcessor 介面的類別工廠](#kinesis-record-processor-implementation-factory-java)
+ [建立工作者](#kcl-java-worker)
+ [修改組態屬性](#kinesis-record-processor-initialization-java)
+ [遷移至記錄處理器界面的第 2 版](#kcl-java-v2-migration)

## 實作 IRecordProcessor 方法
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL 目前支援兩種版本的 `IRecordProcessor` 界面：原始界面適用於第一版的 KCL，而第 2 版自 KCL 1.5.0 版起均可使用。兩種界面皆完全受支援。兩種界面皆可完整支援。您的選擇取決於具體的情境要求。如需查看兩者間的所有差異，請參閱您在本機建置的 Javadoc 或原始碼。以下各節概要說明最低限度的入門實作。

**Topics**
+ [原始界面 (第 1 版)](#kcl-java-interface-original)
+ [更新界面 （第 2 版）](#kcl-java-interface-v2)

### 原始界面 (第 1 版)
<a name="kcl-java-interface-original"></a>

原始 `IRecordProcessor` 界面 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) 公開了您的消費者必須實作的下列記錄處理器方法。範例提供的實作可讓您用於做為起點 (請參閱 `AmazonKinesisApplicationSampleRecordProcessor.java`)。

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
KCL 將於記錄處理器執行個體化時呼叫 `initialize` 方法，傳遞特定碎片 ID 作為參數。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
public void initialize(String shardId)
```

**processRecords**  
KCL 會呼叫 `processRecords` 方法，傳遞由 `initialize(shardId)` 方法所指定碎片中之資料記錄的清單。記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`Record` 類別公開了下列方法，可供存取記錄的資料、序號和分割區索引鍵。

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

範例中，私有方法 `processRecordsWithRetries` 的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將檢查點指標 (`IRecordProcessorCheckpointer`) 傳遞給 `processRecords` 為您進行這項追蹤。記錄處理器將對此界面呼叫 `checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 將不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `processRecords`。例如，處理器可以每呼叫三次 `checkpoint` 才呼叫一次 `processRecords`。您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `checkpoint` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `IRecordProcessorCheckpointer.checkpoint`。

KCL 倚賴 `processRecords` 以處理任何因處理資料記錄而引發的例外狀況。如果 `processRecords` 擲回例外狀況，KCL 將略過例外狀況發生前已傳遞的資料記錄。也就是說，這些記錄不會重新傳送到擲回例外狀況的記錄處理器或消費者內的任何其他記錄處理器。

**shutdown**  
KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉原因為 `ZOMBIE`) 時呼叫 `shutdown` 方法。

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

KCL 還會將 `IRecordProcessorCheckpointer` 界面傳遞給 `shutdown`。如果關閉原因是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

### 更新界面 （第 2 版）
<a name="kcl-java-interface-v2"></a>

更新後的 `IRecordProcessor` 界面 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) 公開了您的消費者必須實作的下列記錄處理器方法：

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

原始版本界面的所有引數皆可透過容器物件的 get 方法進行存取。例如，若要擷取 `processRecords()` 中的記錄清單，可使用 `processRecordsInput.getRecords()`。

自此界面的第 2 版 (KCL 1.5.0 及更新版本) 起，除了原始界面提供的輸入外，還可使用以下各項新的輸入：

起始序號  
在傳遞給 `InitializationInput` 操作的 `initialize()` 物件中，將向記錄處理器執行個體提供的各筆記錄其起始序號。這是由先前處理同一碎片的記錄處理器執行個體執行上一次檢查點作業的序號。當您的應用程式需要此序號時，請提供這項資訊。

待定檢查點序號  
在傳遞給 `initialize()` 操作的 `InitializationInput` 物件中，上一個記錄處理器執行個體於停止前未能遞交的待定檢查點序號 (若有)。

## 實作 IRecordProcessor 介面的類別工廠
<a name="kinesis-record-processor-implementation-factory-java"></a>

實作記錄處理器方法的類別還需要實作處理站。您的消費者在執行個體化工作者時將傳遞此處理站的參考。

範例是在 `AmazonKinesisApplicationSampleRecordProcessorFactory.java` 檔案中使用原始記錄處理器界面實作處理站類別。若您希望類別處理站建立第 2 版的記錄處理器，請使用套件名稱 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`。

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## 建立工作者
<a name="kcl-java-worker"></a>

如 [實作 IRecordProcessor 方法](#kinesis-record-processor-implementation-interface-java) 所述，KCL 記錄處理器界面有兩種版本可供選擇，而這將影響您建立工作者的方式。原始記錄處理器界面使用以下程式碼結構建立工作者：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

若為第 2 版的記錄處理器界面，您則可使用 `Worker.Builder` 建立工作者，而不必擔心應該使用哪個建構函數以及引數的順序。更新後的記錄處理器界面使用以下程式碼結構建立工作者：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 修改組態屬性
<a name="kinesis-record-processor-initialization-java"></a>

範例提供了組態屬性的預設值。工作者的這份組態資料隨後整併到 `KinesisClientLibConfiguration` 物件。此物件以及 `IRecordProcessor` 的類別處理站參考將傳遞至用於執行個體化工作者的呼叫。您可使用 Java 屬性檔案 (請參閱 `AmazonKinesisApplicationSample.java`) 以自訂值覆寫任何這些屬性。

### Application name (應用程式名稱)
<a name="configuration-property-application-name"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可能分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-cred-java"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。例如，如果您在 EC2 執行個體上執行取用者，建議您使用 IAM 角色來啟動執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者登入資料最為安全。

範例應用程式首先嘗試從執行個體中繼資料擷取 IAM 憑證：

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

如果範例應用程式無法從執行個體中繼資料取得登入資料，其將嘗試從屬性檔案擷取登入資料：

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

如需執行個體中繼資料的詳細資訊，請參閱《*Amazon EC2 使用者指南*》中的[執行個體中繼資料](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)。

### 將工作者 ID 用於多個執行個體
<a name="kinesis-record-processor-workerid-java"></a>

範例初始化程式碼透過使用本機電腦的名稱並附加全域唯一識別符的方式建立工作者 ID (`workerId`)，如以下程式碼片段所示。如此可支援消費者應用程式的多個執行個體在單一電腦上執行的情況。

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## 遷移至記錄處理器界面的第 2 版
<a name="kcl-java-v2-migration"></a>

若您想要遷移使用原始界面的程式碼，則除了遵照前述步驟外，您還需執行以下步驟：

1. 將您的記錄處理器類別更改為匯入第 2 版的記錄處理器界面：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. 將各項輸入的參考更改為使用容器物件的 `get` 方法。例如，在 `shutdown()` 操作中，將 "`checkpointer`" 更改為 "`shutdownInput.getCheckpointer()`"。

1. 將您的記錄處理器處理站類別更改為匯入第 2 版的記錄處理器處理站界面：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. 將工作者的建構更改為使用 `Worker.Builder`。例如：

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# 在 Node.js 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Node.js。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Node.js 的 KCL 並完全以 Node.js 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Node.js KCL，請移至 [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs)。

**範本程式碼下載**

Node.js 提供了兩份適用於 KCL 的程式碼範例：
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  以下各節將利用此範例說明以 Node.js 建置 KCL 取用者應用程式的原理。
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   程度更為進階的範例，使用真實情境，適合您在熟悉基本範本程式碼之後研究。本文不會就此範例進行討論，但其本身附有 README 檔案提供更多詳細資訊。

以 Node.js 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作記錄處理器](#kinesis-record-processor-implementation-interface-nodejs)
+ [修改組態屬性](#kinesis-record-processor-initialization-nodejs)

## 實作記錄處理器
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

使用適用於 Node.js 的 KCL 所開發最簡單形式的取用者必須實作 `recordProcessor` 函數，後者則又包含 `initialize`、`processRecords` 和 `shutdown` 函數。範例提供的實作可讓您用於做為起點 (請參閱 `sample_kcl_app.js`)。

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
KCL 將於記錄處理器啟動時呼叫 `initialize` 函數。此記錄處理器只會處理以 `initializeInput.shardId` 傳遞的碎片 ID，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL 將依照 `initialize` 函數內指定的碎片，使用該碎片中各資料記錄的清單做為輸入以呼叫此函數。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
processRecords: function(processRecordsInput, completeCallback)
```

除了資料本身外，記錄還包含工作者在處理資料時可使用的序號和分割區索引鍵。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`record` 字典公開了以下的索引鍵值組，可供存取記錄的資料、序號和分割區索引鍵：

```
record.data
record.sequenceNumber
record.partitionKey
```

請注意，資料為 Base64 編碼。

基本範例中，`processRecords` 函數的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過以 `processRecordsInput.checkpointer` 傳遞的 `checkpointer` 物件進行這項追蹤。記錄處理器將呼叫 `checkpointer.checkpoint` 函數，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將在您重新啟動碎片處理時使用此資訊，以便從上一筆已知處理過的記錄處繼續處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞序號給 `checkpoint` 函數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應**僅**在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `processRecords`。例如，處理器可以每呼叫三次該函數才呼叫一次 `checkpoint`，或於記錄處理器外部發生事件時呼叫 (比方您已實作的自訂確認/驗證服務)。

您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

基本範例應用程式示範了最簡單可行的方式呼叫 `checkpointer.checkpoint` 函數。此時您可以在該函數中為您的消費者加入其他所需的檢查點邏輯。

**shutdown**  
KCL 會在處理結束 (`shutdownInput.reason` 為 `TERMINATE`) 或工作者不再回應 (`shutdownInput.reason` 為 `ZOMBIE`) 時呼叫 `shutdown` 函數。

```
shutdown: function(shutdownInput, completeCallback)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

KCL 還會將 `shutdownInput.checkpointer` 物件傳遞給 `shutdown`。如果關閉原因是 `TERMINATE`，您即應確保記錄處理器已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 函數。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-nodejs"></a>

範例提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱基本範例中的 `sample.properties`)。

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可能分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-credentials-nodejs"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。`sample.properties` 檔案必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您是在 Amazon EC2 執行個體上執行取用者，建議您使用 IAM 角色來設定執行個體。反映與此 IAM 角色相關聯許可 AWS 的憑證可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

以下範例設定 KCL​ 使用 `sample_kcl_app.js` 中提供的記錄處理器來處理名為 `kclnodejssample` 的 Kinesis 資料串流。

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# 在 .NET 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 .NET。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 .NET 的 KCL 並完全以 .NET 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 .NET KCL，請前往 [Kinesis Client Library (.NET)](https://github.com/awslabs/amazon-kinesis-client-net)。如需下載 .NET KCL 取用者應用程式的範本程式碼，請至 GitHub 前往[適用於 .NET 的 KCL 範例取用者專案](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer)頁面。

以 .NET 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 IRecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-dotnet)
+ [修改組態屬性](#kinesis-record-processor-initialization-dotnet)

## 實作 IRecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

消費者必須實作 `IRecordProcessor` 的下列方法。範例消費者提供的實作可讓您用於做為起點 (請參閱 `SampleRecordProcessor` 中的 `SampleConsumer/AmazonKinesisSampleConsumer.cs` 類別)。

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**初始化**  
KCL 將於記錄處理器執行個體化時呼叫此方法，透過 `input` 參數 (`input.ShardId`) 傳遞特定碎片 ID。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
`input` 會呼叫此方法，透過 `input.Records` 參數 () 傳遞由 `Initialize` 方法所指定碎片中之資料記錄的清單。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
public void ProcessRecords(ProcessRecordsInput input)
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`Record` 類別公開了以下項目，可供存取記錄的資料、序號和分割區索引鍵：

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

範例中，`ProcessRecordsWithRetries` 方法的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將 `Checkpointer` 物件傳遞給 `ProcessRecords` (`input.Checkpointer`) 為您進行這項追蹤。記錄處理器將呼叫 `Checkpointer.Checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `Checkpointer.Checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `Checkpointer.Checkpoint` 的呼叫代表所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `Checkpointer.Checkpoint`。記錄處理器不需要在每次呼叫 `Checkpointer.Checkpoint` 時呼叫 `ProcessRecords`。例如，處理器可以每呼叫三次或四次該方法才呼叫一次 `Checkpointer.Checkpoint`。您可以選擇性指定某筆記錄的確切序號做為 `Checkpointer.Checkpoint` 的參數。在此情況下，KCL 將假定各記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `Checkpoint(Checkpointer checkpointer)` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `Checkpointer.Checkpoint` 方法。

適用於 .NET 的 KCL 處理例外狀況的方式有別於其他 KCL 語言程式庫，其並不會處理任何因處理資料記錄而引發的例外狀況。使用者程式碼未捕捉的任何例外狀況都將導致程式當機。

**Shutdown**  
KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉 `input.Reason` 值為 `ZOMBIE`) 時呼叫 `Shutdown` 方法。

```
public void Shutdown(ShutdownInput input)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

KCL 還會將 `Checkpointer` 物件傳遞給 `shutdown`。如果關閉原因是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-dotnet"></a>

範例消費者提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱 `SampleConsumer/kcl.properties`)。

### Application name (應用程式名稱)
<a name="modify-kinesis-record-processor-application-name"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可能分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-creds-dotnet"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您在 EC2 執行個體上執行取用者應用程式，建議您使用 IAM 角色設定執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者登入資料最為安全。

範例的屬性檔案設定由 KCL 使用 `AmazonKinesisSampleConsumer.cs` 所提供的記錄處理器，處理名為 "words" 的 Kinesis 資料串流。

# 在 Python 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Python。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Python KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。如需下載 Python KCL 取用者應用程式的範本程式碼，請至 GitHub 前[往適用於 Python 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)頁面。

以 Python 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 RecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-py)
+ [修改組態屬性](#kinesis-record-processor-initialization-py)

## 實作 RecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 類別必須擴充 `RecordProcessorBase` 以實作下列方法。範例提供的實作可讓您用於做為起點 (請參閱 `sample_kclpy_app.py`)。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
KCL 將於記錄處理器執行個體化時呼叫 `initialize` 方法，傳遞特定碎片 ID 作為參數。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL 會呼叫此方法，傳遞由 `initialize` 方法所指定碎片中之資料記錄的清單。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
def process_records(self, records, checkpointer) 
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`record` 字典公開了以下的索引鍵值組，可供存取記錄的資料、序號和分割區索引鍵：

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

請注意，資料為 Base64 編碼。

範例中，`process_records` 方法的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將 `Checkpointer` 物件傳遞給 `process_records` 為您進行這項追蹤。記錄處理器將對此物件呼叫 `checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `process_records`。例如，處理器可以每呼叫三次該方法才呼叫一次 `checkpoint`。您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `checkpoint` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `Checkpointer.checkpoint` 方法。

KCL 倚賴 `process_records` 以處理任何因處理資料記錄而引發的例外狀況。如果 `process_records` 擲回例外狀況，KCL 將略過例外狀況發生前已傳遞至 `process_records` 的資料記錄。也就是說，這些記錄不會重新傳送到擲回例外狀況的記錄處理器或消費者內的任何其他記錄處理器。

**shutdown**  
 KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉 `reason` 為 `ZOMBIE`) 時呼叫 `shutdown` 方法。

```
def shutdown(self, checkpointer, reason)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

 KCL 還會將 `Checkpointer` 物件傳遞給 `shutdown`。如果關閉 `reason` 是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-py"></a>

範例提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱 `sample.properties`)。

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-py"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-creds-py"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您在 Amazon EC2 執行個體上執行取用者應用程式，建議您使用 IAM 角色設定執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

範例的屬性檔案設定由 KCL 使用 `sample_kclpy_app.py` 所提供的記錄處理器，處理名為 "words" 的 Kinesis 資料串流。

# 在 Ruby 中開發 Kinesis Client Library 消費者
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Ruby。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Ruby 的 KCL 並完全以 Ruby 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Ruby KCL，請前往 [Kinesis Client Library (Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby)。如需下載 Ruby KCL 取用者應用程式的範本程式碼，請至 GitHub 前往[適用於 Ruby 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples)頁面。

如需 KCL Ruby 支援程式庫的詳細資訊，請參閱 [KCL Ruby Gems 文件](http://www.rubydoc.info/gems/aws-kclrb)。

# 開發 KCL 2.x 消費者
<a name="developing-consumers-with-kcl-v2"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

此主題說明如何使用 2.0 版本的 Kinesis Client Library (KCL)。

如需關於 KCL 的更多資訊，請參閱[使用 Kinesis Client Library 1.x 開發取用者](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)中的概觀。

根據您要使用的選項，從下列主題中進行選擇。

**Topics**
+ [在 Java 中開發 Kinesis Client Library 取用者](kcl2-standard-consumer-java-example.md)
+ [在 Python 中開發 Kinesis Client Library 取用者](kcl2-standard-consumer-python-example.md)
+ [使用 KCL 2.x 開發增強型廣發消費者](building-enhanced-consumers-kcl-retired.md)

# 在 Java 中開發 Kinesis Client Library 取用者
<a name="kcl2-standard-consumer-java-example"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

以下程式碼顯示 `ProcessorFactory` 和 `RecordProcessor` 的 Java 範例實作。如果您想要利用增強型散發功能，請參閱[使用具有增強型散發功能的取用者](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html)​。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# 在 Python 中開發 Kinesis Client Library 取用者
<a name="kcl2-standard-consumer-python-example"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Python。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Python KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。如需下載 Python KCL 取用者應用程式的範本程式碼，請至 GitHub 前[往適用於 Python 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)頁面。

以 Python 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 RecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-py)
+ [修改組態屬性](#kinesis-record-processor-initialization-py)

## 實作 RecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 類別必須擴充 `RecordProcessorBase` 類別以實作下列方法。

```
initialize
process_records
shutdown_requested
```

範例提供的實作可讓您用於做為起點。

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## 修改組態屬性
<a name="kinesis-record-processor-initialization-py"></a>

範例提供了組態屬性的預設值，如下列指令碼所示。您可使用自訂值覆寫任何這些屬性。

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-py"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 憑證
<a name="kinesis-record-processor-creds-py"></a>

您必須將 AWS 登入資料提供給[預設登入資料提供者鏈結中的其中一個登入資料提供者](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。如果您在 Amazon EC2 執行個體上執行取用者應用程式，建議您使用 IAM role. AWS credentials 設定執行個體，以反映與此 IAM 角色相關聯的許可，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

# 使用 KCL 2.x 開發增強型廣發消費者
<a name="building-enhanced-consumers-kcl-retired"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

在 Amazon Kinesis Data Streams 中使用*增強型散發功*能的取用者從資料串流接收記錄時，專用輸送量可高達每個碎片每秒 2 MB 的資料。這類消費者不必與其他從串流接收資料的消費者競爭。如需詳細資訊，請參閱[開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。

您可以使用 2.0 版或更新版本的 Kinesis Client Library (KCL) 開發應用程式，利用增強型散發功能從串流接收資料。KCL 將自動為您的應用程式訂閱串流中的所有碎片，並確保您的取用者應用程式讀取的輸送量值達到每個碎片每秒 2 MB。如果您想要使用 KCL 但不想開啟增強型散發功能，請參閱[使用 Kinesis Client Library 2.0 開發取用者](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html)​。

**Topics**
+ [在 Java 中使用 KCL 2.x 開發增強型廣發消費者](building-enhanced-consumers-kcl-java.md)

# 在 Java 中使用 KCL 2.x 開發增強型廣發消費者
<a name="building-enhanced-consumers-kcl-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 2.0 版或更新版本的 Kinesis Client Library (KCL) 在 Amazon Kinesis Data Streams 中開發應用程式，利用增強型散發功能從串流接收資料。以下程式碼顯示 `ProcessorFactory` 和 `RecordProcessor` 的 Java 範例實作。

建議您在 `KinesisAsyncClient` 中，使用 `KinesisClientUtil` 來建立 `KinesisAsyncClient` 及設定 `maxConcurrency`。

**重要**  
Amazon Kinesis Client 可能會明顯發生延遲，除非您設定 `KinesisAsyncClient` 的 `maxConcurrency` 夠高，足以運作所有的租賃服務，並可額外使用 `KinesisAsyncClient`。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```