

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Migrazione di KLC da 1.x a 3.x
<a name="streams-migrating-kcl"></a>

## Panoramica di
<a name="migrating-kcl-overview"></a>

Questa guida fornisce istruzioni per la migrazione dell’applicazione consumer da KCL 1.x a KCL 3.x. A causa delle differenze di architettura tra KCL 1.x e KCL 3.x, la migrazione richiede l’aggiornamento di diversi componenti per garantire la compatibilità.

KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. È necessario prima migrare l’elaboratore di record, il generatore dell’elaboratore di record e le classi di lavoratori al formato compatibile con KCL 3.x e seguire la procedura di migrazione per la migrazione da KCL 1.x a KCL 3.x.

## Fasi della migrazione
<a name="migration-steps"></a>

**Topics**
+ [Fase 1: migrare l’elaboratore di record](#step1-record-processor)
+ [Fase 2: migrare il generatore dell’elaboratore di record](#step2-record-processor-factory)
+ [Fase 3: eseguire la migrazione del lavoratore](#step3-worker-migration)
+ [Fase 4: panoramica e consigli sulla configurazione di KCL 3.x](#step4-configuration-migration)
+ [Fase 5: migrare da KCL 2.x a KCL 3.x](#step5-kcl2-to-kcl3)

### Fase 1: migrare l’elaboratore di record
<a name="step1-record-processor"></a>

L’esempio seguente mostra un elaboratore di record implementato per l’Adattatore Kinesis per i flussi DynamoDB con KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Per migrare la classe RecordProcessor**

1. Modifica le interfacce da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` verso `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` come segue:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Aggiorna le istruzioni di importazione per i metodi `initialize` e `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Sostituisci il metodo `shutdownRequested` con i seguenti nuovi metodi: `leaseLost`, `shardEnded` e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Segue la versione aggiornata della classe dell’elaboratore di record:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Nota**  
L'adattatore Kinesis di DynamoDB Streams ora utilizza il modello Record. SDKv2 In SDKv2, `AttributeValue` gli oggetti complessi (,`BS`,, `NS` `M``L`,`SS`) non restituiscono mai null. Utilizza i metodi `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()` per verificare se questi valori esistono.

### Fase 2: migrare il generatore dell’elaboratore di record
<a name="step2-record-processor-factory"></a>

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di un generatore di KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Per migrare `RecordProcessorFactory`**
+ Modifica l’interfaccia implementata da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, come segue:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Di seguito è riportato un esempio di generatore di elaboratore di record in 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Fase 3: eseguire la migrazione del lavoratore
<a name="step3-worker-migration"></a>

Nella versione 3.0 della KCL, una nuova classe, denominata **Pianificatore**, sostituisce la classe **Lavoratore**. Di seguito è illustrato un esempio di un lavoratore di KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Per migrare il lavoratore**

1. Modifica la dichiarazione `import` per la classe `Worker` nelle dichiarazioni di importazione delle classi `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importa `StreamTracker` e modifica l’importazione di `StreamsWorkerFactory` in `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Seleziona la posizione da cui avviare l’applicazione. Può essere `TRIM_HORIZON` o `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Creazione di un’istanza `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Crea l’oggetto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Crea l’oggetto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Crea `Scheduler` con `ConfigsBuilder` come mostrato nell’esempio seguente:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Importante**  
L’impostazione `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantiene la compatibilità tra l’Adattatore Kinesis per i flussi DynamoDB per KCL v3 e KCL v1, non tra KCL v2 e v3.

### Fase 4: panoramica e consigli sulla configurazione di KCL 3.x
<a name="step4-configuration-migration"></a>

Per una descrizione dettagliata delle configurazioni introdotte dopo KCL 1.x e rilevanti in KCL 3.x, consulta [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) and [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Importante**  
Invece di creare direttamente oggetti di `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` e `retrievalConfig`, si consiglia di utilizzare `ConfigsBuilder` per impostare le configurazioni in KCL 3.x e versioni successive per evitare problemi di inizializzazione del Pianificatore. `ConfigsBuilder` offre un modo più flessibile e gestibile per configurare l’applicazione KCL.

#### Configurazioni con aggiornamento del valore predefinito in KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Nella versione 1.x di KCL, il valore predefinito per `billingMode` è impostato su `PROVISIONED`. Tuttavia, con la versione 3.x di KCL, l’impostazione predefinita `billingMode` è `PAY_PER_REQUEST` (modalità on demand). Si consiglia di utilizzare la modalità con capacità on demand per la tabella di lease per regolare automaticamente la capacità in base all’utilizzo. Per indicazioni sull’utilizzo della capacità allocata per le tabelle di lease, consulta [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Nella versione 1.x di KCL, il valore predefinito per `idleTimeBetweenReadsInMillis` è impostato su 1.000 (o 1 secondo). La versione 3.x di KCL imposta il valore predefinito per i`dleTimeBetweenReadsInMillis` su 1.500 (o 1,5 secondi), ma l’Adattatore Kinesis per i flussi Amazon DynamoDB sostituisce il valore predefinito su 1.000 (o 1 secondo).

#### Nuove configurazioni in KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Questa configurazione definisce l’intervallo di tempo prima che gli shard di nuovo riscontro inizino l’elaborazione e viene calcolata come 1,5 × `leaseAssignmentIntervalMillis`. Se questa impostazione non è configurata in modo esplicito, l’intervallo di tempo predefinito è 1,5 × `failoverTimeMillis`. L’elaborazione di nuovi shard prevede la scansione della tabella di lease e l’interrogazione di un indice secondario globale (GSI) sulla tabella di lease. La riduzione di `leaseAssignmentIntervalMillis` aumenta la frequenza delle operazioni Scan e Query, con conseguente aumento dei costi di DynamoDB. Si consiglia di impostare questo valore su 2000 (o 2 secondi) per ridurre al minimo il ritardo nell’elaborazione di nuovi shard.

`shardConsumerDispatchPollIntervalMillis`  
Questa configurazione definisce l’intervallo tra polling successivi da parte del consumer dello shard per attivare le transizioni di stato. Nella versione 1.x di KCL, questo comportamento era controllato dal parametro `idleTimeInMillis`, che non era esposto come impostazione configurabile. Con la versione 3.x di KCL, si consiglia di impostare questa configurazione in modo che corrisponda al valore utilizzato per ` idleTimeInMillis` nella configurazione della versione 1.x di KCL.

### Fase 5: migrare da KCL 2.x a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Per garantire una transizione e una compatibilità fluide con l’ultima versione di Kinesis Client Library (KCL), segui le fasi 5 – 8 nelle istruzioni della guida alla migrazione per l’[aggiornamento da KCL 2.x a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Per la risoluzione dei problemi più comuni di KCL 3.x, consulta [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).