

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Sviluppa consumatori personalizzati con un throughput condiviso
<a name="shared-throughput-consumers"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Se non è necessaria una velocità di trasmissione effettiva dedicata durante la ricezione dei dati dal flusso di dati Kinesis e se non sono necessari ritardi di propagazione della lettura inferiori a 200 ms, puoi creare applicazioni consumater come descritto nei seguenti argomenti. È possibile utilizzare la Kinesis Client Library (KCL) o il. AWS SDK per Java

**Topics**
+ [Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL](custom-kcl-consumers.md)

Per ulteriori informazioni sulla creazione di consumer che possono ricevere record dal flusso di dati Kinesis con velocità di trasmissione effettiva dedicata, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

# Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL
<a name="custom-kcl-consumers"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Uno dei metodi per sviluppare un'applicazione consumer personalizzata con velocità di trasmissione effettiva condivisa consiste nell'utilizzare la Kinesis Client Library (KCL). 

Scegli tra i seguenti argomenti per la versione di KCL che stai utilizzando.

**Topics**
+ [Sviluppa i consumatori di KCL 1.x](developing-consumers-with-kcl.md)
+ [Sviluppa KCL 2.x Consumers](developing-consumers-with-kcl-v2.md)

# Sviluppa i consumatori di KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile sviluppare un'applicazione consumer per flusso di dati Amazon Kinesis utilizzando la Kinesis Client Library (KCL). 

Per ulteriori informazioni su KCL, consulta [Informazioni su KCL (versioni precedenti)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Scegli tra i seguenti argomenti a seconda dell'opzione che desideri utilizzare.

**Topics**
+ [Sviluppa un utente di Kinesis Client Library in Java](kinesis-record-processor-implementation-app-java.md)
+ [Sviluppa un utente della Kinesis Client Library in Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Sviluppa un utente di Kinesis Client Library in .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Sviluppa un utente della Kinesis Client Library in Python](kinesis-record-processor-implementation-app-py.md)
+ [Sviluppa un utente di Kinesis Client Library in Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Sviluppa un utente di Kinesis Client Library in Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Java. [Per visualizzare il riferimento a Javadoc, consultate l'argomento Javadoc per Class.AWS AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)

Per scaricare Java KCL da GitHub, vai a [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client)Java). Per individuare la KCL Java su Apache Maven, vai alla pagina [Risultati di ricerca di KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Per scaricare il codice di esempio per un'applicazione consumer Java KCL da GitHub, vai alla pagina del progetto di [esempio KCL for Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) su. GitHub 

L'applicazione di esempio utilizza [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). È possibile modificare la configurazione di registro nel metodo statico `configure` definito nel file `AmazonKinesisApplicationSample.java`. [https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Java:

**Topics**
+ [Implementa i metodi del processore IRecord](#kinesis-record-processor-implementation-interface-java)
+ [Implementa una fabbrica di classi per l'interfaccia IRecord Processor](#kinesis-record-processor-implementation-factory-java)
+ [Crea un lavoratore](#kcl-java-worker)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-java)
+ [Esegui la migrazione alla versione 2 dell'interfaccia del processore di registrazione](#kcl-java-v2-migration)

## Implementa i metodi del processore IRecord
<a name="kinesis-record-processor-implementation-interface-java"></a>

La KCL supporta attualmente due versioni dell'interfaccia `IRecordProcessor`: l'interfaccia originale è disponibile con la prima versione della KCL e la versione 2 è disponibile a partire da KCL versione 1.5.0. Entrambe le interfacce sono completamente supportate. La scelta dipende dai tuoi requisiti specifici di scenario. Fai riferimento ai tuoi Javadocs locali o al codice sorgente per visualizzare tutte le differenze. Le seguenti sezioni delineano l'implementazione minima per iniziare.

**Topics**
+ [Interfaccia originale (Versione 1)](#kcl-java-interface-original)
+ [Interfaccia aggiornata (versione 2)](#kcl-java-interface-v2)

### Interfaccia originale (Versione 1)
<a name="kcl-java-interface-original"></a>

L'interfaccia `IRecordProcessor` originale (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) espone i seguenti metodi di processore del record che il tuo consumer deve implementare. L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
La KCL chiama il metodo `initialize` quando viene creata un'istanza del processore di record, passando un ID della partizione specifico come parametro. Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
La KCL chiama il metodo `processRecords` e passa un elenco di record di dati dalla partizione specificata dal metodo `initialize(shardId)`. Il processore di record elabora i dati in questi record in base alla semantica del consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. La classe `Record` espone i seguenti metodi che forniscono l'accesso ai dati del record, al numero di sequenza e alla chiave di partizione. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Nell'esempio, il metodo privato `processRecordsWithRetries` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un checkpointer (`IRecordProcessorCheckpointer`) a `processRecords`. Il processore di record chiama il metodo `checkpoint` in questa interfaccia per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvierà l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a `checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `checkpoint` solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `checkpoint` in ciascuna chiamata a `processRecords`. Un processore potrebbe, per esempio, chiamare `checkpoint` in ogni terza chiamata a `processRecords`. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato `checkpoint` mostra come effettuare la chiamata a `IRecordProcessorCheckpointer.checkpoint` utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL si basa su `processRecords` per gestire eventuali eccezioni generate dall'elaborazione dei record di dati. Se viene generata un'eccezione da `processRecords`, la KCL omette i record di dati passati prima dell'eccezione. Ciò significa che questi record non sono inviati nuovamente al processore di record che ha generato l'eccezione o a qualsiasi altro processore di record nel consumer.

**shutdown**  
La KCL chiama il metodo `shutdown` sia al termine dell'elaborazione (il motivo dell'arresto è `TERMINATE`) che quando il worker non risponde più (il motivo dell'arresto è `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un'interfaccia `IRecordProcessorCheckpointer` a `shutdown`. Se il motivo dell'arresto è `TERMINATE`, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo `checkpoint` in questa interfaccia.

### Interfaccia aggiornata (versione 2)
<a name="kcl-java-interface-v2"></a>

L'interfaccia `IRecordProcessor` aggiornata (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) espone i seguenti metodi di processore del record che il tuo consumer deve implementare: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Tutti gli argomenti dalla versione originale dell'interfaccia sono accessibili tramite metodi get negli oggetti del container. Ad esempio, per recuperare l'elenco dei record in `processRecords()`, è possibile utilizzare `processRecordsInput.getRecords()`.

A partire dalla versione 2 di questa interfaccia (KCL 1.5.0 e versioni successive), i seguenti nuovi input sono disponibili in aggiunta agli input forniti dall'interfaccia originale:

Numero di sequenza di partenza  
Nell'oggetto `InitializationInput` passato all'operazione `initialize()`, il numero di sequenza iniziale a partire da cui i record verrebbero forniti all'istanza del processore di record. Questo è l'ultimo numero di sequenza in cui è stato eseguito il checkpoint dall'istanza del processore di record che aveva precedentemente elaborato lo stesso shard. Questi dati sono forniti nel caso in cui la tua applicazione necessiti di queste informazioni. 

Numero di sequenza di checkpoint in sospeso  
Nell'oggetto `InitializationInput` passato all'operazione `initialize()`, il numero di sequenza di checkpoint in sospeso (se del caso) che non è stato possibile confermare prima dell'arresto dell'istanza precedente del processore di record.

## Implementa una fabbrica di classi per l'interfaccia IRecord Processor
<a name="kinesis-record-processor-implementation-factory-java"></a>

È inoltre necessario implementare un generatore per la classe che implementa i metodi del processore di record. Quando il tuo consumer avvia un'istanza del lavoratore, passa un riferimento a questo generatore.

Il campione implementa il generatore di classe nel file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` utilizzando l'interfaccia del processore di record originale. Se si desidera che il generatore di classe crei processori di record della versione 2, utilizzi il nome del pacchetto `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Crea un lavoratore
<a name="kcl-java-worker"></a>

Come discusso nella [Implementa i metodi del processore IRecord](#kinesis-record-processor-implementation-interface-java), ci sono due versioni dell'interfaccia del processore di record KCL da cui scegliere; ciò influenza il modo in cui è possibile creare un worker. L'interfaccia del processore di record originale utilizza la seguente struttura di codice per creare un lavoratore:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Con la versione 2 dell'interfaccia del processore di record, è possibile utilizzare `Worker.Builder` per creare un lavoratore senza dover preoccuparsi di quale costruttore utilizzare e dell'ordine degli argomenti. L'interfaccia del processore di record aggiornata utilizza la seguente struttura di codice per creare un lavoratore:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-java"></a>

L'esempio fornisce valori di default per le proprietà di configurazione. Questi dati di configurazione per il lavoratore sono poi consolidati in un oggetto `KinesisClientLibConfiguration`. Questo oggetto e un riferimento al generatore di classe per `IRecordProcessor` sono passati nella chiamata che avvia un'istanza del lavoratore. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori utilizzando un file di proprietà Java (consulta `AmazonKinesisApplicationSample.java`).

### Application name (Nome applicazione)
<a name="configuration-property-application-name"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-cred-java"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Ad esempio, se l'applicazione consumer è in esecuzione su un'istanza Amazon EC2, consigliamo di avviare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un consumer in esecuzione in un'istanza EC2.

L'applicazione di esempio prova prima a recuperare le credenziali IAM dai metadati dell'istanza: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Se l'applicazione di esempio non è in grado di ottenere le credenziali dai metadati dell'istanza, tenta di recuperare le credenziali da un file proprietà:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Per ulteriori informazioni sui metadati delle istanze, consulta [Instance Metadata](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) nella *Amazon EC2* User Guide.

### Usa l'ID del lavoratore per più istanze
<a name="kinesis-record-processor-workerid-java"></a>

L'esempio di codice di inizializzazione crea un ID per il lavoratore, `workerId`utilizzando il nome del computer locale e aggiungendo un identificatore univoco globale come illustrato nel seguente frammento di codice. Questo approccio supporta lo scenario di più istanze dell'applicazione di consumo in esecuzione in un singolo computer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Esegui la migrazione alla versione 2 dell'interfaccia del processore di registrazione
<a name="kcl-java-v2-migration"></a>

Se si desidera migrare il codice che utilizza l'interfaccia originale, in aggiunta ai passaggi descritti in precedenza, sono necessari i seguenti passaggi:

1. Cambia la classe del tuo processore di record per importare la versione 2 dell'interfaccia del processore di record:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Cambia i riferimenti per gli input per utilizzare i metodi `get` negli oggetti del container. Ad esempio, nell'operazione `shutdown()`, cambia "`checkpointer`" con "`shutdownInput.getCheckpointer()`".

1. Cambia la classe del generatore del processore di record per importare la versione 2 dell'interfaccia del generatore del processore di record:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Cambia la costruzione del lavoratore per utilizzare `Worker.Builder`. Esempio:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Sviluppa un utente della Kinesis Client Library in Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x sarà disponibile il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Node.js.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per Node.js e scrivi la tua app consumer interamente in Node.js, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon presenta alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare il file KCL Node.js da GitHub, vai alla [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Download di codice di esempio**

Ci sono due esempi di codice disponibili per KCL in Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Viene utilizzato nelle seguenti sezioni per illustrare i concetti fondamentali della costruzione di un'applicazione consumer KCL in Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Leggermente più avanzato e utilizza uno scenario reale. Da utilizzare dopo avere acquisito familiarità con il codice di esempio di base. Questo esempio non è discusso qui, ma dispone di un file README con ulteriori informazioni.

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Node.js:

**Topics**
+ [Implementa il processore di registrazione](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modifica le proprietà di configurazione](#kinesis-record-processor-initialization-nodejs)

## Implementa il processore di registrazione
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

Il consumer più semplice possibile che utilizza la KCL per Node.js deve implementare una funzione `recordProcessor`, che a sua volta contiene le funzioni `initialize`, `processRecords` e `shutdown`. L'esempio fornisce un'implementazione che è possibile utilizzare come punto di partenza (consulta `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
La KCL chiama la funzione `initialize` quando il processore di record si avvia. Questo processore di record elabora esclusivamente l'ID dello shard passato come `initializeInput.shardId` e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 La KCL chiama questa funzione con input che contiene un elenco di record di dati dalla partizione specificata alla funzione `initialize`. Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione, che il lavoratore può utilizzare durante l'elaborazione dei dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. Il dizionario `record` espone le seguenti coppie chiave-valore per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

```
record.data
record.sequenceNumber
record.partitionKey
```

Tieni presente che i dati sono codificati in Base64.

Nell'esempio di base, la funzione `processRecords` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per un oggetto `checkpointer` passato come `processRecordsInput.checkpointer`. Il tuo processore di record chiama la funzione `checkpointer.checkpoint` per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni quando si riavvia l'elaborazione della partizione in modo tale che l'elaborazione continua dall'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvia l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato il numero di sequenza alla funzione `checkpoint`, la KCL suppone che la chiamata a `checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `checkpoint` **solo** dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `checkpoint` in ciascuna chiamata a `processRecords`. Un processore potrebbe, ad esempio, richiamare `checkpoint` ogni tre chiamate o qualche evento esterno al processore di dischi, come un verification/validation servizio personalizzato che hai implementato. 

Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

L'applicazione di esempio di base mostra la chiamata più semplice possibile alla funzione `checkpointer.checkpoint`. È possibile aggiungere le altre logiche di creazione di checkpoint di cui hai bisogno per il tuo consumer a questo punto della funzione.

**shutdown**  
La KCL chiama la funzione `shutdown` sia al termine dell'elaborazione (`shutdownInput.reason` è `TERMINATE`) che quando il worker non risponde più (`shutdownInput.reason` è `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un oggetto `shutdownInput.checkpointer` a `shutdown`. Se il motivo dell'arresto è `TERMINATE`, è necessario assicurarsi che il processore di record abbia terminato l'elaborazione di qualsiasi record di dati e, di seguito, chiamare la funzione `checkpoint` in questa interfaccia.

## Modifica le proprietà di configurazione
<a name="kinesis-record-processor-initialization-nodejs"></a>

L'esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta `sample.properties` nell'esempio di base).

### Application name (Nome applicazione)
<a name="kinesis-record-processor-application-name-nodejs"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-credentials-nodejs"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Il file `sample.properties` deve rendere le tue credenziali disponibili per uno dei provider di credenziali nella [catena di provider di credenziali di default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se esegui il tuo consumer su un'istanza Amazon EC2, ti consigliamo di configurare l'istanza con un ruolo IAM. AWS le credenziali che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione di consumo in esecuzione in un'istanza EC2.

Nell'esempio seguente si configura la KCL per elaborare un flusso di dati Kinesis denominato `kclnodejssample` utilizzando il processore di record fornito in `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Sviluppa un utente di Kinesis Client Library in .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x sarà disponibile il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso .NET.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per .NET e scrivi la tua app consumer interamente in .NET, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon presenta alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare il.NET KCL da GitHub, vai a [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Per scaricare il codice di esempio per un'applicazione consumer di.NET KCL, vai alla pagina del progetto [KCL for .NET sample consumer](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) su. GitHub

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in .NET:

**Topics**
+ [Implementa i metodi della classe IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-dotnet)

## Implementa i metodi della classe IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

Il consumer deve implementare i seguenti metodi per `IRecordProcessor`. Il consumer di esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta la classe `SampleRecordProcessor` in `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Inizializzazione**  
La KCL chiama questo metodo quando viene creata un'istanza del processore di record, passando un ID della partizione specifico nel parametro `input` (`input.ShardId`). Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
La KCL chiama questo metodo e passa una lista di record di dati nel parametro `input` (`input.Records`) dalla partizione specificata dal metodo `Initialize`. Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. La classe `Record` espone quanto segue per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Nell'esempio, il metodo `ProcessRecordsWithRetries` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un oggetto `Checkpointer` a `ProcessRecords` (`input.Checkpointer`). Il processore di record chiama il metodo`Checkpointer.Checkpoint` per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvia l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `Checkpointer.Checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a `Checkpointer.Checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `Checkpointer.Checkpoint` solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `Checkpointer.Checkpoint` in ciascuna chiamata a `ProcessRecords`. Un processore potrebbe, per esempio, chiamare `Checkpointer.Checkpoint` in ogni terza o quarta chiamata. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `Checkpointer.Checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato `Checkpoint(Checkpointer checkpointer)` mostra come effettuare la chiamata al metodo `Checkpointer.Checkpoint` utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL per .NET gestisce le eccezioni in modo diverso rispetto alle altre biblioteche di linguaggi KCL, in quanto non gestisce eventuali eccezioni generate dall'elaborazione dei record di dati. Le eccezioni non rilevate dal codice dell'utente determinano l'arresto del programma.

**Arresto**  
La KCL chiama il metodo `Shutdown` sia al termine dell'elaborazione (il motivo dell'arresto è `TERMINATE`) che quando il worker non risponde più (il valore di `input.Reason` dell'arresto è `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un oggetto `Checkpointer` a `shutdown`. Se il motivo dell'arresto è `TERMINATE`, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo `checkpoint` in questa interfaccia.

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-dotnet"></a>

Il consumer di esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta `SampleConsumer/kcl.properties`).

### Application name (Nome applicazione)
<a name="modify-kinesis-record-processor-application-name"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-creds-dotnet"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Le [proprietà di esempio](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) devono rendere le tue credenziali disponibili per uno dei provider di credenziali nella [catena di provider di credenziali di default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se l'applicazione consumer è in esecuzione su un'istanza EC2, consigliamo di configurare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un consumer in esecuzione in un'istanza EC2.

Il file di proprietà di esempio configura la KCL per elaborare un flusso di dati Kinesis denominato "words" utilizzando il processore di record fornito in `AmazonKinesisSampleConsumer.cs`. 

# Sviluppa un utente della Kinesis Client Library in Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Python.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per Python e scrivi la tua app consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare Python KCL da GitHub, vai alla [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Per scaricare il codice di esempio per un'applicazione consumer Python KCL, vai alla pagina del progetto di esempio [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) su. GitHub

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Python:

**Topics**
+ [Implementa i metodi della classe RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-py)

## Implementa i metodi della classe RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` deve estendere la `RecordProcessorBase` per implementare i seguenti metodi. L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
La KCL chiama il metodo `initialize` quando viene creata un'istanza del processore di record, passando un ID della partizione specifico come parametro. Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 La KCL chiama questo metodo e passa un elenco di record di dati dalla partizione specificata dal metodo `initialize`. Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. Il dizionario `record` espone le seguenti coppie chiave-valore per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Tieni presente che i dati sono codificati in Base64.

Nell'esempio, il metodo `process_records` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un oggetto `Checkpointer` a `process_records`. Il processore di record chiama il metodo `checkpoint` in questo oggetto per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvia l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a `checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `checkpoint` solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `checkpoint` in ciascuna chiamata a `process_records`. Un processore potrebbe, per esempio, chiamare `checkpoint` in ogni terza chiamata. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato `checkpoint` mostra come effettuare la chiamata al metodo `Checkpointer.checkpoint` utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL si basa su `process_records` per gestire eventuali eccezioni generate dall'elaborazione dei record di dati. Se viene generata un'eccezione da `process_records`, la KCL omette i record di dati passati a `process_records` prima dell'eccezione. Ciò significa che questi record non sono inviati nuovamente al processore di record che ha generato l'eccezione o a qualsiasi altro processore di record nel consumer.

**shutdown**  
 La KCL chiama il metodo `shutdown` sia al termine dell'elaborazione (il motivo dell'arresto è `TERMINATE`) che quando il worker non risponde più (il `reason` l'arresto è `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

 La KCL trasferisce inoltre un oggetto `Checkpointer` a `shutdown`. Se il `reason` dell'arresto è `TERMINATE`, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo `checkpoint` in questa interfaccia.

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-py"></a>

L'esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta `sample.properties`).

### Application name (Nome applicazione)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le tue applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori possono essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-creds-py"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Le [proprietà di esempio](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) devono rendere le tue credenziali disponibili per uno dei provider di credenziali nella [catena di provider di credenziali di default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se l'applicazione consumer è in esecuzione su un'istanza Amazon EC2, è consigliabile configurare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione di consumo in esecuzione in un'istanza EC2.

Il file di proprietà di esempio configura la KCL per elaborare un flusso di dati Kinesis denominato "words" utilizzando il processore di record fornito in `sample_kclpy_app.py`. 

# Sviluppa un utente di Kinesis Client Library in Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Ruby.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Perciò, se installi KCL per Ruby e scrivi la tua app consumer interamente in Ruby, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare Ruby KCL da GitHub, vai alla [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-ruby) (Ruby). Per scaricare il codice di esempio per un'applicazione consumer Ruby KCL, vai alla pagina del progetto di esempio [KCL](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) for Ruby su. GitHub

Per ulteriori informazioni sulla biblioteca di supporto Ruby di KCL, consulta [Documentazione di Ruby Gems KCL](http://www.rubydoc.info/gems/aws-kclrb).

# Sviluppa KCL 2.x Consumers
<a name="developing-consumers-with-kcl-v2"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Questo argomento illustra come utilizzare la versione 2.0 di Kinesis Client Library (KCL). 

Per ulteriori informazioni sulla KCL, consulta la panoramica in [Sviluppo di app consumer utilizzando Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Scegli tra i seguenti argomenti a seconda dell'opzione che desideri utilizzare.

**Topics**
+ [Sviluppa un utente di Kinesis Client Library in Java](kcl2-standard-consumer-java-example.md)
+ [Sviluppa un utente della Kinesis Client Library in Python](kcl2-standard-consumer-python-example.md)
+ [Sviluppa utenti avanzati con fan-out con KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Sviluppa un utente di Kinesis Client Library in Java
<a name="kcl2-standard-consumer-java-example"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Il seguente codice mostra un esempio di implementazione in Java di `ProcessorFactory` e `RecordProcessor`. Se vuoi sfruttare la funzionalità fan-out avanzato, consulta [Utilizzo di app Consumer con il fan-out avanzato](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Sviluppa un utente della Kinesis Client Library in Python
<a name="kcl2-standard-consumer-python-example"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Python.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per Python e scrivi la tua app consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare Python KCL da GitHub, vai alla [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Per scaricare il codice di esempio per un'applicazione consumer Python KCL, vai alla pagina del progetto di esempio [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) su. GitHub

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Python:

**Topics**
+ [Implementa i metodi della classe RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-py)

## Implementa i metodi della classe RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` deve estendere la classe `RecordProcessorBase` per implementare i seguenti metodi:

```
initialize
process_records
shutdown_requested
```

L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-py"></a>

L'esempio fornisce valori di default per le proprietà di configurazione, come mostra lo script seguente. È possibile sostituire una qualsiasi di queste proprietà con i propri valori.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name (Nome applicazione)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori possono essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credenziali
<a name="kinesis-record-processor-creds-py"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di [credenziali predefinita](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Se esegui la tua applicazione consumer su un'istanza Amazon EC2, ti consigliamo di configurare l'istanza con un ruolo IAM. AWS le credenziali che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione di consumo in esecuzione in un'istanza EC2.

# Sviluppa utenti avanzati con fan-out con KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x end-of-support arriverà il 30 gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Le applicazioni consumer che utilizzano il *fan-out avanzato* in Flusso di dati Amazon Kinesis possono ricevere record da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Questo tipo di applicazioni consumer non sono in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

È possibile utilizzare la versione 2.0 o una successiva della Kinesis Client Library (KCL) per sviluppare applicazioni che usano il fan-out avanzato per ricevere dati dai flussi. KCL sottoscrive automaticamente l'applicazione a tutti gli shard di uno stream e garantisce che l'applicazione consumer sia in grado di leggere con un valore di throughput di 2 per shard. MB/sec Se desideri utilizzare la KCL senza attivare la funzionalità fan-out avanzato, consulta [Sviluppo di applicazioni Consumer tramite la Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Sviluppa utenti fan-out avanzati utilizzando KCL 2.x in Java](building-enhanced-consumers-kcl-java.md)

# Sviluppa utenti fan-out avanzati utilizzando KCL 2.x in Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x end-of-support arriverà il 30 gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la versione 2.0 o una successiva della Kinesis Client Library (KCL) per sviluppare applicazioni in Flusso di dati Amazon Kinesis per ricevere dati dai flussi tramite il fan-out avanzato. Il seguente codice mostra un esempio di implementazione in Java di `ProcessorFactory` e `RecordProcessor`.

Si consiglia di utilizzare `KinesisClientUtil` per creare `KinesisAsyncClient` e configurare `maxConcurrency` in `KinesisAsyncClient`.

**Importante**  
Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato `KinesisAsyncClient` per avere una `maxConcurrency` sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi di `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```