

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Migrasi dari KCL 1.x ke KCL 3.x
<a name="streams-migrating-kcl"></a>

## Ikhtisar
<a name="migrating-kcl-overview"></a>

Panduan ini memberikan petunjuk untuk memigrasikan aplikasi konsumen Anda dari KCL 1.x ke KCL 3.x. Karena perbedaan arsitektur antara KCL 1.x dan KCL 3.x, migrasi memerlukan pembaruan beberapa komponen untuk memastikan kompatibilitas.

KCL 1.x menggunakan kelas dan antarmuka yang berbeda dibandingkan dengan KCL 3.x. Anda harus memigrasikan prosesor rekaman, pabrik prosesor rekaman, dan kelas pekerja ke format yang kompatibel dengan KCL 3.x terlebih dahulu, dan ikuti langkah-langkah migrasi untuk migrasi KCL 1.x ke KCL 3.x.

## Langkah migrasi
<a name="migration-steps"></a>

**Topics**
+ [Langkah 1: Migrasikan prosesor rekaman](#step1-record-processor)
+ [Langkah 2: Migrasikan pabrik prosesor rekaman](#step2-record-processor-factory)
+ [Langkah 3: Migrasikan pekerja](#step3-worker-migration)
+ [Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x](#step4-configuration-migration)
+ [Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x](#step5-kcl2-to-kcl3)

### Langkah 1: Migrasikan prosesor rekaman
<a name="step1-record-processor"></a>

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk adaptor KCL 1.x DynamoDB Streams Kinesis:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Untuk memigrasikan kelas RecordProcessor**

1. Ubah antarmuka dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` dan `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` menjadi `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` sebagai berikut:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Perbarui pernyataan impor untuk `initialize` dan `processRecords` metode:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Ganti `shutdownRequested` metode dengan metode baru berikut:`leaseLost`,`shardEnded`, dan`shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Berikut ini adalah versi terbaru dari kelas prosesor rekaman:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**catatan**  
DynamoDB Streams Kinesis Adapter sekarang menggunakan model Record. SDKv2 Dalam SDKv2, `AttributeValue` objek kompleks (`BS`,`NS`, `M``L`,`SS`) tidak pernah mengembalikan null. Gunakan`hasBs()`,`hasNs()`,`hasM()`,`hasL()`, `hasSs()` metode untuk memverifikasi apakah nilai-nilai ini ada.

### Langkah 2: Migrasikan pabrik prosesor rekaman
<a name="step2-record-processor-factory"></a>

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Untuk memigrasikan `RecordProcessorFactory`**
+ Ubah antarmuka yang diimplementasikan dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` ke`software.amazon.kinesis.processor.ShardRecordProcessorFactory`, sebagai berikut:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Berikut ini adalah contoh pabrik prosesor rekaman di 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Langkah 3: Migrasikan pekerja
<a name="step3-worker-migration"></a>

**Dalam versi 3.0 dari KCL, kelas baru, yang disebut **Scheduler**, menggantikan kelas Worker.** Berikut ini adalah contoh pekerja KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Untuk memigrasikan pekerja**

1. Ubah `import` pernyataan untuk `Worker` kelas ke pernyataan impor untuk `Scheduler` dan `ConfigsBuilder` kelas.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Impor `StreamTracker` dan ubah impor `StreamsWorkerFactory` ke`StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Pilih posisi untuk memulai aplikasi. Bisa jadi `TRIM_HORIZON` atau`LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Buat sebuah `StreamTracker` instance.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Buat `AmazonDynamoDBStreamsAdapterClient` objek.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Buat `ConfigsBuilder` objek.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Buat `Scheduler` menggunakan `ConfigsBuilder` seperti yang ditunjukkan pada contoh berikut:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**penting**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`Pengaturan mempertahankan kompatibilitas antara DynamoDB Streams Kinesis Adapter untuk KCL v3 dan KCL v1, bukan antara KCL v2 dan v3.

### Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x
<a name="step4-configuration-migration"></a>

[Untuk penjelasan rinci tentang konfigurasi yang diperkenalkan setelah KCL 1.x yang relevan di KCL 3.x lihat konfigurasi KCL dan [konfigurasi klien migrasi KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html).](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)

**penting**  
Alih-alih langsung membuat objek`checkpointConfig`,,,`coordinatorConfig`, `processorConfig` dan `leaseManagementConfig` `metricsConfig``retrievalConfig`, kami sarankan menggunakan `ConfigsBuilder` untuk mengatur konfigurasi di KCL 3.x dan versi yang lebih baru untuk menghindari masalah inisialisasi Scheduler. `ConfigsBuilder`menyediakan cara yang lebih fleksibel dan dapat dipelihara untuk mengkonfigurasi aplikasi KCL Anda.

#### Konfigurasi dengan nilai default pembaruan di KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Dalam KCL versi 1.x, nilai default untuk `billingMode` diatur ke. `PROVISIONED` Namun, dengan KCL versi 3.x, defaultnya `billingMode` adalah `PAY_PER_REQUEST` (mode on-demand). Kami menyarankan Anda menggunakan mode kapasitas sesuai permintaan untuk tabel sewa Anda untuk secara otomatis menyesuaikan kapasitas berdasarkan penggunaan Anda. Untuk panduan tentang penggunaan kapasitas yang disediakan untuk tabel sewa Anda, lihat [Praktik terbaik untuk tabel sewa dengan mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html) kapasitas yang disediakan.

`idleTimeBetweenReadsInMillis`  
Dalam KCL versi 1.x, nilai default untuk diatur ke `idleTimeBetweenReadsInMillis` adalah 1.000 (atau 1 detik). KCL versi 3.x menetapkan nilai default `dleTimeBetweenReadsInMillis` untuk i menjadi 1.500 (atau 1,5 detik), tetapi Amazon DynamoDB Streams Kinesis Adapter mengganti nilai default menjadi 1.000 (atau 1 detik).

#### Konfigurasi baru di KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Konfigurasi ini mendefinisikan interval waktu sebelum pecahan yang baru ditemukan mulai diproses, dan dihitung sebagai 1,5 ×. `leaseAssignmentIntervalMillis` Jika pengaturan ini tidak dikonfigurasi secara eksplisit, interval waktu default menjadi 1,5 ×. `failoverTimeMillis` Memproses pecahan baru melibatkan pemindaian tabel sewa dan menanyakan indeks sekunder global (GSI) pada tabel sewa. Menurunkan `leaseAssignmentIntervalMillis` peningkatan frekuensi operasi pemindaian dan kueri ini, menghasilkan biaya DynamoDB yang lebih tinggi. Kami merekomendasikan pengaturan nilai ini ke 2000 (atau 2 detik) untuk meminimalkan keterlambatan dalam memproses pecahan baru.

`shardConsumerDispatchPollIntervalMillis`  
Konfigurasi ini mendefinisikan interval antara jajak pendapat berturut-turut oleh konsumen shard untuk memicu transisi status. Di KCL versi 1.x, perilaku ini dikendalikan oleh `idleTimeInMillis` parameter, yang tidak diekspos sebagai pengaturan yang dapat dikonfigurasi. Dengan KCL versi 3.x, kami sarankan untuk mengatur konfigurasi ini agar sesuai dengan nilai yang digunakan ` idleTimeInMillis` dalam pengaturan KCL versi 1.x Anda.

### Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Untuk memastikan kelancaran transisi dan kompatibilitas dengan versi Kinesis Client Library (KCL) terbaru, ikuti langkah 5-8 dalam petunjuk panduan migrasi untuk [meningkatkan dari KCL 2.x ke KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

[Untuk masalah pemecahan masalah umum KCL 3.x, lihat Memecahkan masalah aplikasi konsumen KCL.](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)