

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 從 KCL 1.x 移轉到 KCL 3.x
<a name="streams-migrating-kcl"></a>

## 概觀
<a name="migrating-kcl-overview"></a>

本指南提供如何將取用者應用程式從 KCL 1.x 移轉到 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異，移轉需要更新多個元件以確保相容性。

KCL 1.x 使用的類別和介面與 KCL 3.x 不同。您必須先將記錄處理器、記錄處理器處理站及工作者類別移轉至 KCL 3.x 相容格式，並遵循 KCL 1.x 移轉至 KCL 3.x 的移轉步驟。

## 移轉步驟
<a name="migration-steps"></a>

**Topics**
+ [步驟 1：移轉記錄處理器](#step1-record-processor)
+ [步驟 2：移轉記錄處理器處理站](#step2-record-processor-factory)
+ [步驟 3：移轉工作者](#step3-worker-migration)
+ [步驟 4：KCL 3.x 組態概觀和建議](#step4-configuration-migration)
+ [步驟 5：從 KCL 2.x 移轉至 KCL 3.x](#step5-kcl2-to-kcl3)

### 步驟 1：移轉記錄處理器
<a name="step1-record-processor"></a>

以下範例顯示基於 KCL 1.x DynamoDB Streams Kinesis 轉接器所實作的記錄處理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**移轉 RecordProcessor 類別**

1. 將介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改為 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. 更新 `initialize` 和 `processRecords` 方法的匯入陳述式：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. 將 `shutdownRequested` 方法取代為以下的新方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

記錄處理器類別經更新後的版本如下：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注意**  
DynamoDB Streams Kinesis 轉接器現在使用 SDKv2 記錄模型。在 SDKv2 中，複雜 `AttributeValue` 物件 (`BS`、`NS`、`M`、`L`、`SS`) 永遠不會傳回空值。使用 `hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` 方法驗證這些值是否存在。

### 步驟 2：移轉記錄處理器處理站
<a name="step2-record-processor-factory"></a>

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例：

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**移轉 `RecordProcessorFactory`**
+ 將實作的介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示：

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

以下是 3.0 中記錄處理器工廠的範例：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 步驟 3：移轉工作者
<a name="step3-worker-migration"></a>

在 KCL 的版本 3.0，名為**排程器**​的新類別會取代**工作者**類別。以下是 KCL 1.x 工作者的範例：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**移轉至工作者**

1. 將 `Worker` 類別的 `import` 陳述式變更為 `Scheduler` 和 `ConfigsBuilder` 類別的匯入陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 匯入 `StreamTracker` 並將匯入 `StreamsWorkerFactory` 變更至 `StreamsSchedulerFactory`。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 選擇要啟動應用程式的位置。其可能是 `TRIM_HORIZON` 或 `LATEST`。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. 建立 `StreamTracker` 執行個體。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. 建立 `AmazonDynamoDBStreamsAdapterClient` 物件。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. 建立 `ConfigsBuilder` 物件。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 使用 `ConfigsBuilder` 建立 `Scheduler`，如下列範例所示：

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 設定可維持 KCL v3 與 KCL v1 之間的 DynamoDB Streams Kinesis Adapter 相容性，而非 KCL v2 與 v3 之間的相容性。

### 步驟 4：KCL 3.x 組態概觀和建議
<a name="step4-configuration-migration"></a>

如需詳細了解 KCL 3.x 中，在 KCL 1.x 之後推出的相關組態，請參閱 [KCL 組態](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html)和 [KCL 移轉用戶端組態](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)。

**重要**  
我們建議使用 `ConfigsBuilder` 在 KCL 3.x 及更新版本中設定組態，而非直接建立 `checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig` 及 `retrievalConfig` 的物件，以避免排程器初始化問題。`ConfigsBuilder` 則提供更靈活且跟更容易維護的 KCL 應用程式設定方式。

#### KCL 3.x 的更新預設值組態
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL 1.x 版中，`billingMode` 的預設值設定為 `PROVISIONED`。不過，KCL 3.x 版 `billingMode` 的預設值為 `PAY_PER_REQUEST` (隨需模式)。我們建議租用資料表使用隨需容量模式，根據用量自動調整容量。如需租用資料表使用佈建容量的指南，請參閱[具有佈建容量模式的租用資料表最佳實務](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)。

`idleTimeBetweenReadsInMillis`  
在 KCL 1.x 版中，`idleTimeBetweenReadsInMillis` 的預設值設定為 1,000 (或 1 秒)。KCL 3.x 版會將 i`dleTimeBetweenReadsInMillis` 的預設值設定為 1,500 (或 1.5 秒)，但 Amazon DynamoDB Streams Kinesis 轉接器會將預設值覆寫為 1,000 (或 1 秒)。

#### KCL 3.x 中的新組態
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
此組態會定義新發現碎片開始處理之前的時間間隔，計算方式為 1.5 x `leaseAssignmentIntervalMillis`。如果未明確配置此設定，則時間間隔預設為 1.5 x `failoverTimeMillis`。處理新碎片包含掃描租用資料表，並在租用資料表上查詢全域次要索引 (GSI)。降低 `leaseAssignmentIntervalMillis` 會增加掃描和查詢操作的頻率，進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000 (或 2 秒)，將處理新碎片的延遲降至最低。

`shardConsumerDispatchPollIntervalMillis`  
此組態定義了碎片取用者進行連續輪詢以觸發狀態轉換的間隔時間。在 KCL 1.x 版中，此行為由 `idleTimeInMillis` 參數控制，該參數並未公開為可配置設定。使用 KCL 3.x 版時，我們建議設定此組態，符合您在 KCL 1.x 版設定用於 ` idleTimeInMillis` 的值。

### 步驟 5：從 KCL 2.x 移轉至 KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

為了確保順利轉換至最新的 Kinesis Client Library (KCL) 版本及其相容性，請遵循移轉指南[從 KCL 2.x 升級到 KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) 中，步驟 5-8 的指示。

如需常見的 KCL 3.x 疑難排解問題，請參閱 [KCL 取用者應用程式疑難排解](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)。