

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Desarrollar consumidores personalizados con rendimiento compartido
<a name="shared-throughput-consumers"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de enero de end-of-support 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Si no necesita un rendimiento específico al recibir datos de Kinesis Data Streams y tampoco necesita demoras de propagación de lectura inferiores a 200 ms, puede crear aplicaciones consumidoras tal y como se describe en los temas siguientes. Puede utilizar Kinesis Client Library (KCL) o Kinesis Client Library (KCL) o AWS SDK para Java.

**Topics**
+ [

# Desarrollar consumidores personalizados con rendimiento compartido mediante KCL
](custom-kcl-consumers.md)

Para obtener más información sobre cómo crear consumidores que puedan recibir registros de flujos de datos de Kinesis con un rendimiento específico, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md).

# Desarrollar consumidores personalizados con rendimiento compartido mediante KCL
<a name="custom-kcl-consumers"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Uno de los métodos para desarrollar una aplicación de consumo personalizada con un rendimiento compartido consiste en utilizar Kinesis Client Library (KCL). 

Elegir uno de los siguientes temas para la versión de KCL que utilice.

**Topics**
+ [

# Desarrollar consumidores de KCL 1.x
](developing-consumers-with-kcl.md)
+ [

# Desarrollar consumidores de KCL 2.x
](developing-consumers-with-kcl-v2.md)

# Desarrollar consumidores de KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede desarrollar una aplicación de consumo para Amazon Kinesis Data Streams mediante Kinesis Client Library (KCL). 

Para obtener más información acerca de, consulte [Acerca de KCL (versiones anteriores)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Elija uno de los siguientes temas según la opción que desee utilizar.

**Topics**
+ [

# Desarrollar un consumidor de Kinesis Client Library en Java
](kinesis-record-processor-implementation-app-java.md)
+ [

# Desarrollar un consumidor de Kinesis Client Library en Node.js
](kinesis-record-processor-implementation-app-nodejs.md)
+ [

# Desarrollar un consumidor de Kinesis Client Library en .NET
](kinesis-record-processor-implementation-app-dotnet.md)
+ [

# Desarrollar un consumidor de Kinesis Client Library en Python
](kinesis-record-processor-implementation-app-py.md)
+ [

# Desarrollar un consumidor de Kinesis Client Library en Ruby
](kinesis-record-processor-implementation-app-ruby.md)

# Desarrollar un consumidor de Kinesis Client Library en Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Java. Para ver la referencia sobre Javadoc, consulte el tema sobre [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) de Class. AmazonKinesisClient

Para descargar el KCL de Java GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Para localizar KCL para Java en Apache Maven, vaya a la página de [resultados de la búsqueda de KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Para descargar un código de muestra para una aplicación de consumo de Java KCL desde GitHub, visite la página del proyecto de [ejemplo de KCL para Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) en. GitHub 

La aplicación de muestra utiliza [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). Puede cambiar la configuración de registro en el método estático `configure` definido en el archivo `AmazonKinesisApplicationSample.java`. *Para obtener más información sobre cómo utilizar el registro de Apache Commons con Log4j y aplicaciones AWS Java, consulte [Registrar con Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) en la guía para desarrolladores.AWS SDK para Java *

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Java:

**Topics**
+ [

## Implemente los métodos del procesador IRecord
](#kinesis-record-processor-implementation-interface-java)
+ [

## Implemente una fábrica de clases para la interfaz IRecord del procesador
](#kinesis-record-processor-implementation-factory-java)
+ [

## Crear un proceso de trabajo
](#kcl-java-worker)
+ [

## Modificar las propiedades de configuración
](#kinesis-record-processor-initialization-java)
+ [

## Migrar a la versión 2 de la interfaz del procesador de registros
](#kcl-java-v2-migration)

## Implemente los métodos del procesador IRecord
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL es compatible actualmente con dos versiones de la interfaz de `IRecordProcessor`: la interfaz original está disponible con la primera versión de KCL y la versión 2 está disponible a partir de la versión 1.5.0 de KCL. Ambas interfaces son totalmente compatibles. La elección dependerá de su situación específica. Consulte sus javadocs locales o el código fuente para ver todas las diferencias. En las siguientes secciones se describe la implementación mínima introductoria.

**Topics**
+ [

### Interfaz original (versión 1)
](#kcl-java-interface-original)
+ [

### Interfaz actualizada (versión 2)
](#kcl-java-interface-v2)

### Interfaz original (versión 1)
<a name="kcl-java-interface-original"></a>

La interfaz original `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expone los siguientes métodos del procesador de registros que el consumidor debe implementar. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**inicializar**  
KCL llama al método `initialize` cuando se crea una instancia del procesador de registros y pasa un ID de partición específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
KCL llama al método `processRecords` y pasa una lista de registros de datos desde la partición especificada por el método `initialize(shardId)`. El procesador de registros procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase `Record` expone los siguientes métodos que proporcionan acceso a los datos, el número secuencial y la clave de partición del registro. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

En el ejemplo, el método privado `processRecordsWithRetries` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento pasando un generador de puntos de verificación (`IRecordProcessorCheckpointer`) a `processRecords`. El procesador de registros llama al método `checkpoint` en esta interfaz para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `processRecords`. Un procesador podría, por ejemplo, llamar a `checkpoint` cada tercera vez que llame a `processRecords`. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `checkpoint` muestra cómo llamar a `IRecordProcessorCheckpointer.checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL depende de `processRecords` para administrar cualquier excepción que surja del procesamiento de los registros de datos. Si `processRecords` genera una excepción, KCL omite los registros de datos que se pasaron antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

**shutdown**  
KCL llama al método `shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el motivo del cierre es `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa una interfaz `IRecordProcessorCheckpointer` a `shutdown`. Si el motivo del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

### Interfaz actualizada (versión 2)
<a name="kcl-java-interface-v2"></a>

La interfaz actualizada `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expone los siguientes métodos del procesador de registros que el consumidor debe implementar: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Se puede obtener acceso a todos los argumentos de la versión original de la interfaz mediante métodos "get" en los objetos del contenedor. Por ejemplo, para recuperar la lista de registros en `processRecords()`, puede utilizar `processRecordsInput.getRecords()`.

A partir de la versión 2 de esta interfaz (KCL 1.5.0 y posteriores), están disponibles las siguientes entradas nuevas, además de las entradas proporcionadas por la interfaz original:

starting sequence number  
En el objeto `InitializationInput` que se pasa a la operación `initialize()` el número secuencial inicial a partir del cual se facilitan los registros a la instancia del procesador de registros. Este es el último número secuencial objeto de un punto de comprobación por parte de la instancia del procesador de registros que procesara anteriormente el mismo fragmento. Estos datos se ofrecen por si su aplicación necesitara esta información. 

pending checkpoint sequence number  
En el objeto `InitializationInput` que se pasa a la operación `initialize()`, el número secuencial pendiente de punto de comprobación (si hay alguno) que no se ha podido confirmar antes de que se detuviera la instancia anterior del procesador de registros.

## Implemente una fábrica de clases para la interfaz IRecord del procesador
<a name="kinesis-record-processor-implementation-factory-java"></a>

También necesitará implementar un generador para la clase que implementa los métodos del procesador de registros. Cuando el consumidor crea instancias del proceso de trabajo, pasa una referencia a este generador.

La muestra implementa el generador de clases en el archivo `AmazonKinesisApplicationSampleRecordProcessorFactory.java` mediante la interfaz del procesador de registros original. Si desea que el generador de clases cree procesadores de registros de la versión 2, utilice el nombre de paquete `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Crear un proceso de trabajo
<a name="kcl-java-worker"></a>

Tal y como se ha explicado en [Implemente los métodos del procesador IRecord](#kinesis-record-processor-implementation-interface-java), hay dos versiones de la interfaz de procesador de registros de KCL para elegir, lo que afecta a la creación de un proceso de trabajo. La interfaz de procesador de registros original utiliza la siguiente estructura de código para crear un proceso de trabajo:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Con la versión 2 del procesador de registros, puede utilizar la `Worker.Builder` para crear un proceso de trabajo sin preocuparse por qué constructor utilizar ni por el orden de los argumentos. La interfaz de procesador de registros actualizada utiliza la siguiente estructura de código para crear un proceso de trabajo:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-java"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Este datos de configuración del proceso de trabajo se consolidan posteriormente en un objeto `KinesisClientLibConfiguration`. Este objeto y una referencia al generador de clases de `IRecordProcessor` se pasan en la llamada que crea la instancia del proceso de trabajo. Puede sobrescribir cualquiera de estas propiedades con sus propios valores a través de un archivo de propiedades de Java (consulte `AmazonKinesisApplicationSample.java`).

### Nombre de la aplicación
<a name="configuration-property-application-name"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-cred-java"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Por ejemplo, si ejecuta su consumidor en una instancia de EC2, se recomienda que lance la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para un consumidor que se ejecute en una instancia EC2.

En primer lugar, la aplicación de muestra intenta recuperar las credenciales de IAM de los metadatos de la instancia: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Si la aplicación de muestra no puede obtener credenciales de los metadatos de la instancia, intenta recuperar las credenciales desde un archivo de propiedades:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Para más información sobre los metadatos de instancia, consulte [Metadatos de instancia](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) en la *Guía del usuario de Amazon EC2*.

### Uso de ID de proceso de trabajo para varias instancias
<a name="kinesis-record-processor-workerid-java"></a>

El código de inicialización de muestra crea un ID para el proceso de trabajo, `workerId`, con el nombre del equipo local y un identificador global único anexo, tal y como se muestra en el siguiente fragmento de código. Este enfoque es compatible con un escenario con varias instancias de la aplicación consumidora ejecutándose en un único equipo.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrar a la versión 2 de la interfaz del procesador de registros
<a name="kcl-java-v2-migration"></a>

Si desea migrar código que utilice la interfaz original, además de los pasos descritos anteriormente, tendrá que seguir estos pasos:

1. Cambie la clase de su procesador de registros para importar la versión 2 de la interfaz del procesador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Cambiar las referencias a las entradas para usar métodos `get` en los objetos del contenedor. Por ejemplo, en la operación `shutdown()`, cambie "`checkpointer`" por "`shutdownInput.getCheckpointer()`".

1. Cambie la clase del generador de procesadores de registros para importar la interfaz del generador de procesadores de registros de la versión 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Cambie la construcción del proceso de trabajo para usar `Worker.Builder`. Por ejemplo:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Desarrollar un consumidor de Kinesis Client Library en Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Node.js

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Node.js y escribe su aplicación para consumidores completamente en Node.js, seguirá necesitando instalar Java en su sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar el archivo KCL de Node.js GitHub, vaya a la [biblioteca de clientes de Kinesis (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Descargas de código de muestra**

Hay dos códigos de muestra disponibles para la KCL en Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Se utiliza en las siguientes secciones para ilustrar los aspectos fundamentales de la creación de una aplicación de consumo de KCL en Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Es ligeramente más avanzado y utiliza una situación real. Para cuando se haya familiarizado con el código de muestra básico. Esta muestra no se trata aquí, pero tiene un archivo README con más información.

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Node.js:

**Topics**
+ [

## Implementar el procesador de registros
](#kinesis-record-processor-implementation-interface-nodejs)
+ [

## Modificar las propiedades de configuración
](#kinesis-record-processor-initialization-nodejs)

## Implementar el procesador de registros
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

El consumidor más sencillo posible que utilice KCL para Node.js debe implementar una función `recordProcessor`, que a su vez contenga las funciones `initialize`, `processRecords` y `shutdown`. En la muestra se presenta una implementación que puede utilizar como punto de partida (consulte `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**inicializar**  
KCL llama a la función `initialize` cuando se inicia el procesador de registros. Este procesador de registros procesa solo la ID de fragmento que se haya pasado como `initializeInput.shardId`, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL llama a esta función con una entrada que contiene una lista de registros de datos de la partición especificados para la función `initialize`. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Además de los datos en sí, el registro también contiene un número secuencial y una clave de partición, que el proceso de trabajo puede utilizar al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. El diccionario `record` expone los siguientes pares clave-valor para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

```
record.data
record.sequenceNumber
record.partitionKey
```

Tenga en cuenta que los datos se codifican en Base64.

En el ejemplo básico, la función `processRecords` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento con un objeto `checkpointer` que pasa como `processRecordsInput.checkpointer`. Su procesador de registros llama a la función `checkpointer.checkpoint` para informar a KCL de su avance en el procesamiento de los registros de la partición. En el caso de que se produzca un error en el proceso de trabajo, KCL utiliza esta información al reiniciar el procesamiento de la partición para continuar desde el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no pasa el número secuencial a la función `checkpoint`, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros debería llamar a `checkpoint` **solo** después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `processRecords`. Un procesador podría, por ejemplo, realizar una llamada `checkpoint` una de cada tres veces o algún evento externo a su procesador de registros, como un verification/validation servicio personalizado que haya implementado. 

Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

La aplicación de muestra básica muestra la llamada más sencilla posible a la función `checkpointer.checkpoint`. Puede agregar otra lógica de creación de puntos de comprobación que necesite para su consumidor en este punto en la función.

**shutdown**  
KCL llama a la función `shutdown` cuando finaliza el procesamiento (`shutdownInput.reason` es `TERMINATE`) o cuando el proceso de trabajo ya no responde (`shutdownInput.reason` es `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto `shutdownInput.checkpointer` a `shutdown`. Si el motivo del cierre es `TERMINATE`, debe asegurarse de que el procesador de registros haya terminado de procesar los registros de datos y después llame a la función `checkpoint` en esta interfaz.

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-nodejs"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte `sample.properties` en el ejemplo básico).

### Nombre de la aplicación
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-credentials-nodejs"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. El archivo `sample.properties` debe poner sus credenciales a disposición de uno de los proveedores de credenciales de la [cadena de proveedores de credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si ejecuta su consumidor en una instancia de Amazon EC2, le recomendamos que configure la instancia con un rol de IAM. AWS Las credenciales que reflejan los permisos asociados a esta función de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de la instancia. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

En el siguiente ejemplo, se configura KCL para procesar un flujo de datos de Kinesis denominado `kclnodejssample` utilizando el procesador de registros facilitado en `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Desarrollar un consumidor de Kinesis Client Library en .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de .NET.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala la KCL para.NET y escribe la aplicación para el usuario en su totalidad en .NET, seguirá necesitando instalar Java en el sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar el KCL de.NET GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Para descargar un código de muestra para una aplicación de consumo de KCL de.NET, visite la página de [ejemplos de proyectos de consumo de KCL para .NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en .NET:

**Topics**
+ [

## Implemente los métodos de la clase IRecord Processor
](#kinesis-record-processor-implementation-interface-dotnet)
+ [

## Modificar las propiedades de configuración
](#kinesis-record-processor-initialization-dotnet)

## Implemente los métodos de la clase IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

El consumidor debe implementar los siguientes métodos para `IRecordProcessor`. En el consumidor de muestra se presentan implementaciones que puede utilizar como punto de partida (consulte la clase `SampleRecordProcessor` en `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialize**  
KCL llama a este método cuando se crea una instancia del procesador de registros y pasa un ID de partición específico en el parámetro `input` (`input.ShardId`). Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL llama a este método y pasa una lista de registros de datos en el parámetro `input` (`input.Records`) desde la partición especificada por el método `Initialize`. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase `Record` expone lo siguiente para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

En el ejemplo, el método `ProcessRecordsWithRetries` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto `Checkpointer` a `ProcessRecords` (`input.Checkpointer`). El procesador de registros llama al método `Checkpointer.Checkpoint` para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `Checkpointer.Checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `Checkpointer.Checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `Checkpointer.Checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `Checkpointer.Checkpoint` en cada llamada a `ProcessRecords`. Un procesador podría, por ejemplo, llamar a `Checkpointer.Checkpoint` en cada tercera o cuarta llamada. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `Checkpointer.Checkpoint`. En este caso, KCL supone que los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `Checkpoint(Checkpointer checkpointer)` muestra cómo llamar al método `Checkpointer.Checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL para .NET administra las excepciones de forma diferente a las bibliotecas de KCL para el resto de lenguajes, ya que no administra las excepciones que surgen del procesamiento de los registros de datos. Las excepciones no detectadas procedentes del código del usuario harán que el programa se bloquee.

**Apagado**  
KCL llama al método `Shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el valor de `input.Reason` del cierre es `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto `Checkpointer` a `shutdown`. Si el motivo del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-dotnet"></a>

En el consumidor muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte `SampleConsumer/kcl.properties`).

### Nombre de la aplicación
<a name="modify-kinesis-record-processor-application-name"></a>

KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-creds-dotnet"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. Las [propiedades de muestra](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) deben poner sus credenciales a disposición de uno de los proveedores de credenciales de la [cadena de proveedores de credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si ejecuta su aplicación de consumo en una instancia de EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para un consumidor que se ejecute en una instancia EC2.

El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en `AmazonKinesisSampleConsumer.cs`. 

# Desarrollar un consumidor de Kinesis Client Library en Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Python.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar la KCL de Python GitHub, vaya a la biblioteca de [clientes de Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Para descargar un código de muestra para una aplicación de consumo de KCL para Python, vaya a la página del [proyecto de ejemplo de KCL para Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Python:

**Topics**
+ [

## Implementa los métodos de la clase RecordProcessor
](#kinesis-record-processor-implementation-interface-py)
+ [

## Modificar las propiedades de configuración
](#kinesis-record-processor-initialization-py)

## Implementa los métodos de la clase RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La clase `RecordProcess` debe ampliar la `RecordProcessorBase` para implementar los siguientes métodos. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**inicializar**  
KCL llama al método `initialize` cuando se crea una instancia del procesador de registros y pasa un ID de partición específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL llama a este método y pasa una lista de registros de datos de la partición especificada por el método `initialize`. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. El diccionario `record` expone los siguientes pares clave-valor para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Tenga en cuenta que los datos se codifican en Base64.

En el ejemplo, el método `process_records` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto `Checkpointer` a `process_records`. El procesador de registros llama al método `checkpoint` en este objeto para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `process_records`. Un procesador podría, por ejemplo, llamar a `checkpoint` cada tercera vez que llame. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `checkpoint` muestra cómo llamar al método `Checkpointer.checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL depende de `process_records` para administrar cualquier excepción que surja del procesamiento de los registros de datos. Si `process_records` genera una excepción, KCL omite los registros de datos que se pasaron a `process_records` antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

**shutdown**  
 KCL llama al método `shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el `reason` del cierre es `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

 KCL también pasa un objeto `Checkpointer` a `shutdown`. Si el `reason` del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-py"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte `sample.properties`).

### Nombre de la aplicación
<a name="kinesis-record-processor-application-name-py"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se supone que los procesos de trabajo que están asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-creds-py"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. Las [propiedades de muestra](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) deben poner sus credenciales a disposición de uno de los proveedores de credenciales de la [cadena de proveedores de credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si ejecuta su aplicación de consumo en una instancia de Amazon EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en `sample_kclpy_app.py`. 

# Desarrollar un consumidor de Kinesis Client Library en Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Ruby.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Ruby y escribe su aplicación para consumidores completamente en Ruby, seguirá necesitando instalar Java en su sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar el KCL de Ruby GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client-ruby)Ruby). Para descargar un código de muestra para una aplicación de usuario de Ruby KCL, visite la página del proyecto de [ejemplo de KCL para Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) en. GitHub

Para más información acerca de la biblioteca de soporte de Ruby en KCL, consulte la [documentación de Ruby Gems en KCL](http://www.rubydoc.info/gems/aws-kclrb).

# Desarrollar consumidores de KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

En este tema se muestra cómo utilizar la versión 2.0 de Kinesis Client Library (KCL). 

Para más información acerca de KCL, consulte la información general que se proporciona en [Desarrollo de consumidores mediante Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Elija uno de los siguientes temas según la opción que desee utilizar.

**Topics**
+ [

# Desarrollar un consumidor de Kinesis Client Library en Java
](kcl2-standard-consumer-java-example.md)
+ [

# Desarrollar un consumidor de Kinesis Client Library en Python
](kcl2-standard-consumer-python-example.md)
+ [

# Desarrollar consumidores de distribución mejorados con KCL 2.x
](building-enhanced-consumers-kcl-retired.md)

# Desarrollar un consumidor de Kinesis Client Library en Java
<a name="kcl2-standard-consumer-java-example"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

En el siguiente código, se muestra un ejemplo de implementación en Java de `ProcessorFactory` y `RecordProcessor`. Si desea aprovechar la característica de distribución ramificada mejorada, consulte [Uso de consumidores con la distribución ramificada mejorada](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Desarrollar un consumidor de Kinesis Client Library en Python
<a name="kcl2-standard-consumer-python-example"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Python.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar la KCL de Python GitHub, vaya a la biblioteca de [clientes de Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Para descargar un código de muestra para una aplicación de consumo de KCL para Python, vaya a la página del [proyecto de ejemplo de KCL para Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Python:

**Topics**
+ [

## Implementa los métodos de la clase RecordProcessor
](#kinesis-record-processor-implementation-interface-py)
+ [

## Modificar las propiedades de configuración
](#kinesis-record-processor-initialization-py)

## Implementa los métodos de la clase RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La clase `RecordProcess` debe ampliar la `RecordProcessorBase` para implementar los siguientes métodos:

```
initialize
process_records
shutdown_requested
```

Este ejemplo proporciona implementaciones que puede utilizar como punto de partida.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-py"></a>

En el ejemplo se proporcionan valores predeterminados para las propiedades de configuración, como los que se muestran en el siguiente script. Puede sobrescribir cualquiera de estas propiedades con sus propios valores.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Nombre de la aplicación
<a name="kinesis-record-processor-application-name-py"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se supone que los procesos de trabajo que están asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse entre varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credenciales
<a name="kinesis-record-processor-creds-py"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de [credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. Si ejecuta su aplicación de consumidor en una instancia de Amazon EC2, le recomendamos que configure la instancia con un rol de IAM. AWS Las credenciales que reflejan los permisos asociados a esta función de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de la instancia. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

# Desarrollar consumidores de distribución mejorados con KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Los consumidores que utilizan la *distribución ramificada mejorada* en Amazon Kinesis Data Streams pueden recibir los registros de un flujo de datos con un rendimiento dedicado de hasta 2 MB de datos por segundo por partición. Este tipo de consumidor no tiene que competir con otros consumidores que reciben datos de la secuencia. Para obtener más información, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md).

Puede utilizar la versión 2.0 o posterior de Kinesis Client Library (KCL) para desarrollar aplicaciones que utilicen la distribución ramificada mejorada para recibir datos de los flujos. El KCL suscribe automáticamente su aplicación a todos los fragmentos de una transmisión y garantiza que su aplicación de consumo pueda leer con un valor de rendimiento de 2 por fragmento. MB/sec Si quiere utilizar KCL sin activar la distribución ramificada mejorada, consulte la página sobre [desarrollo de consumidores mediante Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [

# Desarrollar consumidores de distribución mejorados con KCL 2.x en Java
](building-enhanced-consumers-kcl-java.md)

# Desarrollar consumidores de distribución mejorados con KCL 2.x en Java
<a name="building-enhanced-consumers-kcl-java"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x llegará el 30 de enero de 2026. end-of-support **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar la versión 2.0 o posterior de Kinesis Client Library (KCL) para desarrollar aplicaciones en Amazon Kinesis Data Streams para recibir datos de los flujos mediante la distribución ramificada mejorada. En el siguiente código, se muestra un ejemplo de implementación en Java de `ProcessorFactory` y `RecordProcessor`.

Es recomendable que utilice `KinesisClientUtil` para crear `KinesisAsyncClient` y para establecer `maxConcurrency` en `KinesisAsyncClient`.

**importante**  
El cliente de Amazon Kinesis puede experimentar un importante aumento de la latencia, a menos que configure `KinesisAsyncClient` de forma que el valor de `maxConcurrency` sea lo suficientemente alto para permitir todas las asignaciones, además de los usos adicionales de `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```