

# KCL 1.x から KCL 3.x への移行
<a name="streams-migrating-kcl"></a>

## 概要
<a name="migrating-kcl-overview"></a>

このガイドでは、コンシューマーアプリケーションを KCL 1.x から KCL 3.x に移行する手順について説明します。KCL 1.x と KCL 3.x のアーキテクチャの違いにより、移行では互換性を確保するために複数のコンポーネントを更新する必要があります。

KCL 1.x は、KCL 3.x とは異なるクラスとインターフェイスを使用します。まずレコードプロセッサ、レコードプロセッサファクトリ、ワーカークラスを KCL 3.x 互換形式に移行し、KCL 1.x から KCL 3.x への移行手順に従う必要があります。

## 移行手順
<a name="migration-steps"></a>

**Topics**
+ [ステップ 1: レコードプロセッサを移行する](#step1-record-processor)
+ [ステップ 2: レコードプロセッサファクトリーを移行する](#step2-record-processor-factory)
+ [ステップ 3: ワーカーを移行する](#step3-worker-migration)
+ [ステップ 4: KCL 3.x 設定の概要と推奨事項](#step4-configuration-migration)
+ [ステップ 5: KCL 2.x から KCL 3.x に移行する](#step5-kcl2-to-kcl3)

### ステップ 1: レコードプロセッサを移行する
<a name="step1-record-processor"></a>

以下の例は、KCL 1.x DynamoDB Streams Kinesis Adapter に実装されたレコードプロセッサを示しています。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**RecordProcessor クラスを移行するには**

1. インターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` および `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` から `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. `initialize` メソッド `processRecords` とメソッドの import ステートメントを更新します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. `shutdownRequested` メソッドを以下の新しいメソッドに置き換えます。`leaseLost`、`shardEnded`、および `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注記**  
DynamoDB Streams Kinesis Adapter は SDKv2 レコードモデルを使用するようになりました。SDKv2 では、複雑な `AttributeValue` オブジェクト (`BS`、`NS`、`M`、`L`、`SS`) が null を返すことはありません。`hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` メソッドを使用して、これらの値が存在するかどうかを確認します。

### ステップ 2: レコードプロセッサファクトリーを移行する
<a name="step2-record-processor-factory"></a>

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**`RecordProcessorFactory` を移行するには**
+ 実装されているインターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` から `software.amazon.kinesis.processor.ShardRecordProcessorFactory` に変更します。以下に例を示します。

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

以下は、3.0 のレコードプロセッサファクトリーの例です。

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### ステップ 3: ワーカーを移行する
<a name="step3-worker-migration"></a>

バージョン 3.0 の KCL では、新しいクラス **Scheduler** によって **Worker** クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**ワーカーを移行するには**

1. `import` クラスの `Worker` ステートメントを `Scheduler` クラスと `ConfigsBuilder` クラスのインポートステートメントに変更します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. `StreamTracker` をインポートし、`StreamsWorkerFactory` のインポートを `StreamsSchedulerFactory` に変更します。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. アプリケーションを起動するポジションを選択します。`TRIM_HORIZON`、`LATEST` のいずれかになります。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. `StreamTracker` インスタンスを作成します。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. `AmazonDynamoDBStreamsAdapterClient` オブジェクトを作成します。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. `ConfigsBuilder` オブジェクトを作成します。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 次の例に示すように、`ConfigsBuilder` を使用して `Scheduler` を作成します。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
この `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 設定は、KCL v2 と v3 ではなく、KCL v3 と KCL v1 の DynamoDB Streams Kinesis Adapter 間の互換性を維持します。

### ステップ 4: KCL 3.x 設定の概要と推奨事項
<a name="step4-configuration-migration"></a>

KCL 3.x に関連する KCL 1.x 後に導入された設定の詳細については、「[KCL 設定](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html)」と「[KCL 移行クライアント設定](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)」を参照してください。

**重要**  
KCL 3.x 以降のバージョンでは、`checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig`、および `retrievalConfig` オブジェクトを直接作成する代わりに、スケジューラの初期化の問題を回避するために、`ConfigsBuilder` を使用して設定を行うことをお勧めします。`ConfigsBuilder` を使用すると、KCL アプリケーションをより柔軟で保守性の高い方法で構成できます。

#### KCL 3.x でデフォルト値を更新する設定
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL バージョン 1.x では、`billingMode` のデフォルト値は `PROVISIONED` に設定されます。ただし、KCL バージョン 3.x では、デフォルトの `billingMode` は `PAY_PER_REQUEST` (オンデマンドモード) です。使用量に基づいて容量を自動的に調整するには、リーステーブルのオンデマンドキャパシティモードを使用することをお勧めします。リーステーブルにプロビジョンドキャパシティを使用するガイダンスについては、「[プロビジョンドキャパシティモードのリーステーブルのベストプラクティス](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)」を参照してください。

`idleTimeBetweenReadsInMillis`  
KCL バージョン 1.x では、`idleTimeBetweenReadsInMillis` のデフォルト値は 1,000 (または 1 秒) に設定されています。KCL バージョン 3.x は i`dleTimeBetweenReadsInMillis` のデフォルト値を 1,500 (または 1.5 秒) に設定しますが、Amazon DynamoDB Streams Kinesis Adapter はデフォルト値を 1,000 (または 1 秒) に上書きします。

#### KCL 3.x の新しい設定
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
この設定では、新しく検出されたシャードが処理を開始するまでの時間間隔を定義し、1.5 × `leaseAssignmentIntervalMillis` として計算されます。この設定が明示的に設定されていない場合、時間間隔はデフォルトで 1.5 × `failoverTimeMillis` に設定されます。新しいシャードの処理には、リーステーブルをスキャンし、リーステーブルのグローバルセカンダリインデックス (GSI) をクエリする必要があります。`leaseAssignmentIntervalMillis` を小さくすると、これらのスキャンおよびクエリオペレーションの頻度が増加し、DynamoDB のコストが高くなります。新しいシャードの処理の遅延を最小限に抑えるために、この値を 2,000 (または 2 秒) に設定することをお勧めします。

`shardConsumerDispatchPollIntervalMillis`  
この設定では、シャードコンシューマーによる連続するポーリング間の間隔を定義して、状態遷移をトリガーします。KCL バージョン 1.x では、この動作は `idleTimeInMillis` パラメータによって制御され、設定可能な設定として公開されませんでした。KCL バージョン 3.x では、KCL バージョン 1.x のセットアップで ` idleTimeInMillis` で使用される値と一致するようにこの設定を行うことをお勧めします。

### ステップ 5: KCL 2.x から KCL 3.x に移行する
<a name="step5-kcl2-to-kcl3"></a>

最新の Kinesis Client Library (KCL) バージョンとのスムーズな移行と互換性を確保するには、[KCL 2.x から KCL 3.x へのアップグレード](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics)に関する移行ガイドの手順のステップ 5～8 に従います。

一般的な KCL 3.x のトラブルシューティングの問題については、「[Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)」を参照してください。