

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Développez des consommateurs personnalisés avec un débit partagé
<a name="shared-throughput-consumers"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Si vous n'avez pas besoin d'un débit dédié lors de la réception des données à partir de Kinesis Data Streams, et si vous n'avez pas besoin de lire les retards de propagation sous 200 ms, vous pouvez créer des applications consommateur comme décrit dans les rubriques suivantes. Vous pouvez utiliser la bibliothèque cliente Kinesis (KCL) ou le. AWS SDK pour Java

**Topics**
+ [Développez des consommateurs personnalisés avec un débit partagé à l'aide de KCL](custom-kcl-consumers.md)

Pour plus d'informations sur la création d'applications consommateur pouvant recevoir des enregistrements provenant des flux de données Kinesis avec un débit dédié, consultez [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md).

# Développez des consommateurs personnalisés avec un débit partagé à l'aide de KCL
<a name="custom-kcl-consumers"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

L'une des méthodes de développement d'une application client personnalisée avec un débit partagé consiste à utiliser la bibliothèque client Kinesis (KCL). 

Choisissez l'une des rubriques suivantes pour la version KCL que vous utilisez.

**Topics**
+ [Développez les consommateurs de KCL 1.x](developing-consumers-with-kcl.md)
+ [Développez des consommateurs de KCL 2.x](developing-consumers-with-kcl-v2.md)

# Développez les consommateurs de KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez développer une application consommateur pour Amazon Kinesis Data Streams à l'aide de la bibliothèque client Kinesis (KCL). 

Pour plus d'informations sur KCL, consultez [À propos de KCL (versions précédentes)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Choisissez l'une des rubriques suivantes en fonction de l'option que vous souhaitez utiliser.

**Topics**
+ [Développez un client de bibliothèque cliente Kinesis en Java](kinesis-record-processor-implementation-app-java.md)
+ [Développez un client de bibliothèque cliente Kinesis dans Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Développez un client de bibliothèque client Kinesis dans .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Développez un client de bibliothèque cliente Kinesis en Python](kinesis-record-processor-implementation-app-py.md)
+ [Développez un client de bibliothèque cliente Kinesis dans Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Développez un client de bibliothèque cliente Kinesis en Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Java. Pour consulter la référence Javadoc, consultez la rubrique [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) relative à Class. AmazonKinesisClient

Pour télécharger la KCL Java depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Pour rechercher la KCL Java sur Apache Maven, consultez la page [KCL search results](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Pour télécharger un exemple de code pour une application client Java KCL à partir de GitHub, rendez-vous sur la page d'[exemple de projet KCL pour Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) sur. GitHub 

L'exemple d'application utilise [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). Vous pouvez modifier la configuration de la journalisation dans la méthode statique `configure` définie dans le fichier `AmazonKinesisApplicationSample.java`. *Pour plus d'informations sur l'utilisation d'Apache Commons Logging avec Log4j et les applications AWS Java, consultez la section [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) dans le manuel du développeur.AWS SDK pour Java *

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL dans Java :

**Topics**
+ [Implémenter les méthodes IRecord du processeur](#kinesis-record-processor-implementation-interface-java)
+ [Implémenter une fabrique de classes pour l'interface IRecord du processeur](#kinesis-record-processor-implementation-factory-java)
+ [Créer un travailleur](#kcl-java-worker)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-java)
+ [Migrer vers la version 2 de l'interface du processeur d'enregistrements](#kcl-java-v2-migration)

## Implémenter les méthodes IRecord du processeur
<a name="kinesis-record-processor-implementation-interface-java"></a>

La KCL prend en charge actuellement les deux versions de l'interface `IRecordProcessor` : interface d'origine disponible avec la première version de la KCL et la version 2 disponible à partir de la KCL version 1.5.0. Les deux interfaces sont entièrement prises en charge. Votre choix dépend des exigences spécifiques à votre scénario. Reportez-vous à vos Javadocs locales ou au code source pour voir toutes les différences. Les sections suivantes décrivent l'implémentation minimale pour la mise en route.

**Topics**
+ [Interface d'origine (Version 1)](#kcl-java-interface-original)
+ [Interface mise à jour (version 2)](#kcl-java-interface-v2)

### Interface d'origine (Version 1)
<a name="kcl-java-interface-original"></a>

L'interface d'origine `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expose les méthodes de processeur d'enregistrements suivantes que votre application producteur doit implémenter. L'exemple fournit des implémentations que vous pouvez utiliser comme point de départ (voir `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialisation**  
La KCL appelle la méthode `initialize` lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique comme paramètre. Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Kinesis Data Streams a la sémantique *au moins une fois*, ce qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
La KCL appelle la méthode `processRecords`, en passant une liste d'enregistrement de données issue de la partition spécifiée par la méthode `initialize(shardId)`. Le processeur d'enregistrements traite les données contenues dans ces enregistrements selon la sémantique de l'application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. La classe `Record` expose les méthodes suivantes qui permettent d'accéder aux données, numéro de séquence et clé de partition de l'enregistrement. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Dans l'exemple, la méthode privée `processRecordsWithRetries` contient du code qui indique comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet Checkpointer (`IRecordProcessorCheckpointer`) à `processRecords`. Le processeur d'enregistrements appelle la méthode `checkpoint` sur cette interface pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de `checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `checkpoint` seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `checkpoint` à chaque appel de `processRecords`. Un processeur pourrait, par exemple, appeler `checkpoint` à chaque troisième appel de `processRecords`. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `checkpoint`. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

Dans l'exemple, la méthode privée `checkpoint` montre comment appeler `IRecordProcessorCheckpointer.checkpoint` en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL s'appuie sur `processRecords` pour gérer toutes les exceptions générées par le traitement des enregistrements de données. Si une exception est déclenchée depuis `processRecords`, la KCL ignore les enregistrements de données qui ont été transmis avant l'exception. En d'autres termes, ces enregistrements ne sont pas renvoyés au processeur d'enregistrements qui a lancé l'exception ou à tout autre processeur d'enregistrement dans l'application consommateur.

**shutdown**  
La KCL appelle la méthode `shutdown` soit à la fin du traitement (le motif de fermeture étant `TERMINATE`) ou lorsque l'application de travail ne répond plus (la raison de fermeture ayant la valeur `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également une interface `IRecordProcessorCheckpointer` à `shutdown`. Si le motif de fermeture est `TERMINATE`, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode `checkpoint` sur cette interface.

### Interface mise à jour (version 2)
<a name="kcl-java-interface-v2"></a>

L'interface `IRecordProcessor` mise à jour (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expose les méthodes de processeur d'enregistrements suivantes que votre application producteur doit implémenter : 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Tous les arguments provenant de la version initiale de l'interface sont accessibles via les méthodes get sur les objets de conteneur. Par exemple, pour extraire la liste des enregistrements dans `processRecords()`, vous pouvez utiliser `processRecordsInput.getRecords()`.

A partir de la version 2 de cette interface (KCL 1.5.0 et ultérieure), les nouvelles entrées suivantes sont disponibles en plus des entrées fournies par l'interface d'origine :

Numéro de séquence de début  
Dans l'objet `InitializationInput` passé à l'opération `initialize()`, le numéro de séquence de début à partir duquel les enregistrements sont fournis à l'instance de processeur d'enregistrements. C'est le numéro de séquence qui a été contrôlé en dernier par l'instance de processeur d'enregistrements qui a traité précédemment la même partition. Il est fourni si votre application a besoin de cette information. 

Numéro de séquence de point de contrôle en attente  
Dans l'objet `InitializationInput` passé à l'opération `initialize()`, le numéro de séquence de point de contrôle en attente (le cas échéant) qui n'a pas pu être validé avant l'arrêt de l'instance de processeur d'enregistrements précédente..

## Implémenter une fabrique de classes pour l'interface IRecord du processeur
<a name="kinesis-record-processor-implementation-factory-java"></a>

Vous avez aussi besoin d'implémenter une fabrique pour la classe qui implémente les méthodes de processeur d'enregistrements. Lorsque votre application consommateur instancie l'application de travail, elle passe une référence à cette fabrique.

L'exemple implémente la classe Factory dans le fichier `AmazonKinesisApplicationSampleRecordProcessorFactory.java` à l'aide de l'interface de processeur d'enregistrements d'origine. Si vous voulez que la fabrique de classes crée des processeurs d'enregistrements version 2, utilisez le nom de package `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Créer un travailleur
<a name="kcl-java-worker"></a>

Comme présenté dans [Implémenter les méthodes IRecord du processeur](#kinesis-record-processor-implementation-interface-java), deux versions de l'interface de processeur d'enregistrements KCL sont disponibles, ce qui affecte la façon dont vous créez une application de travail. L'interface de processeur d'enregistrements d'origine utilise la structure de code suivante pour créer un travail :

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Avec la version 2 de l'interface de processeur d'enregistrements, vous pouvez utiliser `Worker.Builder` pour créer un travail sans avoir à vous soucier du constructeur à utiliser et de l'ordre des arguments. L'interface de processeur d'enregistrements mise à jour utilise la structure de code suivante pour créer un travail :

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-java"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration. Ces données de configuration du travail sont ensuite consolidées dans un objet `KinesisClientLibConfiguration`. Cet objet et une référence à la fabrique de classes pour `IRecordProcessor` sont passés dans l'appel qui instancie l'application de travail. Vous pouvez remplacer ces propriétés par vos propres valeurs en utilisant un fichier de propriétés Java (voir `AmazonKinesisApplicationSample.java`).

### Application name (Nom de l'application)
<a name="configuration-property-application-name"></a>

La KCL demande un nom d'application qui est unique dans l'ensemble de vos applications et dans les tableaux Amazon DynamoDB de la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-cred-java"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Par exemple, si vous exécutez votre client sur une instance EC2, nous vous recommandons de lancer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

L'exemple d'application tente d'abord d'extraire les informations d'identification IAM à partir des métadonnées d'instance : 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Si l'exemple d'application ne peut pas obtenir les informations d'identification à partir des métadonnées d'instance, il tente d'extraire les informations d'identification d'un fichier de propriétés :

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Pour plus d'informations sur les métadonnées d'instance, consultez la section [Métadonnées d'instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) dans le *guide de l'utilisateur Amazon EC2*.

### Utiliser l'ID du travailleur pour plusieurs instances
<a name="kinesis-record-processor-workerid-java"></a>

L'exemple de code d'initialisation crée un ID `workerId` pour l'application de travail, en utilisant le nom de l'ordinateur local et en y ajoutant un identifiant unique dans le monde entier, comme illustré dans l'extrait de code ci-après. Cette approche prend en charge le scénario où plusieurs instances de l'application consommateur sont exécutées sur le même ordinateur.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrer vers la version 2 de l'interface du processeur d'enregistrements
<a name="kcl-java-v2-migration"></a>

Si vous souhaitez migrer le code qui utilise l'interface d'origine, les étapes suivantes sont nécessaires en plus de celles décrites précédemment :

1. Changez la classe de processeur d'enregistrements pour importer la version 2 de l'interface de processeur d'enregistrements :

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Modifiez les références aux entrées pour utiliser des méthodes `get` sur les objets de conteneur. Par exemple, dans l'opération `shutdown()`, remplacez `checkpointer` par `shutdownInput.getCheckpointer()`.

1. Changez la classe de la fabrique de processeurs d'enregistrements pour importer la version 2 de l'interface de fabrique de processeurs d'enregistrements :

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Modifiez la construction de l'application de travail pour utiliser `Worker.Builder`. Par exemple :

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Développez un client de bibliothèque cliente Kinesis dans Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Node.js.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Node.js et que vous écrivez votre application grand public entièrement dans Node.js, Java doit toujours être installé sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger le fichier KCL Node.js depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-nodejs)Node.js).

**Téléchargements des exemples de code**

Il y a deux exemples de code disponibles pour la KCL pour Node.js :
+ [basic-sample​​](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Il est utilisé dans les sections suivantes pour illustrer les principes fondamentaux de construction d'une application consommateur KCL en Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Il est un peu plus avancé et se sert d'un scénario réel. A utiliser après vous être familiarisé avec l'exemple de code de base. Cet exemple n'est pas présenté ici, mais est accompagné d'un fichier README qui contient plus d'informations.

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Node.js :

**Topics**
+ [Implémenter le processeur d'enregistrement](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-nodejs)

## Implémenter le processeur d'enregistrement
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

L'application consommateur la plus simple possible qui utilise la KCL pour Node.js doit implémenter une fonction `recordProcessor`, laquelle contient les fonctions `initialize`, `processRecords` et `shutdown`. L'exemple fournit une implémentation que vous pouvez utiliser comme point de départ (voir `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialisation**  
La KCL appelle la fonction `initialize` au démarrage du processeur d'enregistrements. Ce processeur d'enregistrements traite uniquement l'ID de partition passé à `initializeInput.shardId` et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique *au moins une fois*, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut éventuellement être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 La KCL appelle cette fonction en indiquant une entrée qui contient une liste d'enregistrements de données de la partition spécifiée pour la fonction `initialize`. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition, que l'application de travail peut utiliser pour traiter les données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. Le dictionnaire `record` expose les paires clé-valeur suivantes pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

```
record.data
record.sequenceNumber
record.partitionKey
```

Notez que les données sont encodées en Base64.

Dans l'exemple de base, la fonction `processRecords` contient du code qui indique comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL se charge d'assurer ce suivi avec un objet `checkpointer` passé comme `processRecordsInput.checkpointer`. Le processeur d'enregistrements appelle la fonction `checkpointer.checkpoint` pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si l'application de travail échoue, la KCL utilise ces informations lorsque vous redémarrez le traitement de la partition pour continuer à partir du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas le numéro de séquence à la fonction `checkpoint`, la KCL suppose que l'appel vers `checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `checkpoint` **seulement** après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `checkpoint` à chaque appel de `processRecords`. Un processeur peut, par exemple, appeler un `checkpoint` appel tous les trois, ou un événement externe à votre processeur d'enregistrement, tel qu'un verification/validation service personnalisé que vous avez mis en œuvre. 

Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `checkpoint`. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

L'exemple d'application de base montre l'appel le plus simple possible de la fonction `checkpointer.checkpoint`. Vous pouvez ajouter à la fonction une autre logique de points de contrôle nécessaire pour votre application consommateur à ce stade.

**shutdown**  
La KCL appelle la fonction `shutdown` soit à la fin du traitement (`shutdownInput.reason` est `TERMINATE`) ou si l'application de travail ne répond plus (`shutdownInput.reason` est `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également un objet `shutdownInput.checkpointer` à `shutdown`. Si le motif de fermeture est `TERMINATE`, vous devez vous assurer que le processeur d'enregistrements a fini de traiter les enregistrements de données et appeler ensuite la fonction `checkpoint` sur cet objet.

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-nodejs"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir `sample.properties` dans l'exemple de base).

### Application name (Nom de l'application)
<a name="kinesis-record-processor-application-name-nodejs"></a>

La KCL nécessite une d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-credentials-nodejs"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Le fichier `sample.properties` doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la [chaîne des fournisseurs d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si vous exécutez votre client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. AWS les informations d'identification qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

L'exemple suivant configure la KCL pour qu'elle traite un flux de données Kinesis appelé `kclnodejssample` à l'aide du processeur d'enregistrements fourni dans `sample_kcl_app.js` :

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Développez un client de bibliothèque client Kinesis dans .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente .NET.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour .NET et que vous écrivez votre application grand public entièrement en .NET, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger le .NET KCL depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Pour télécharger un exemple de code pour une application client .NET KCL, rendez-vous sur la page du [projet client d'exemple KCL pour .NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en .NET :

**Topics**
+ [Implémenter les méthodes de classe IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-dotnet)

## Implémenter les méthodes de classe IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

L'application consommateur doit implémenter les méthodes suivantes pour `IRecordProcessor`. L'exemple d'application consommateur fournit des implémentations que vous pouvez utiliser comme point de départ (voir la classe `SampleRecordProcessor` dans `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialiser**  
La KCL appelle cette méthode lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique dans le paramètre `input` (`input.ShardId`). Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique *au moins une fois*, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut éventuellement être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
La KCL appelle cette méthode, en passant une liste d'enregistrements de données dans le paramètre `input` (`input.Records`), qui sont issues de la partition spécifiée par la méthode `Initialize`. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. La classe `Record` expose le code suivant pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Dans l'exemple, la méthode `ProcessRecordsWithRetries` contient du code qui montre comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet `Checkpointer` à `ProcessRecords` (`input.Checkpointer`). Le processeur d'enregistrements appelle la méthode `Checkpointer.Checkpoint` pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `Checkpointer.Checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de `Checkpointer.Checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `Checkpointer.Checkpoint` seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `Checkpointer.Checkpoint` à chaque appel de `ProcessRecords`. Un processeur peut, par exemple, appeler `Checkpointer.Checkpoint` tous les trois ou quatre appels. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `Checkpointer.Checkpoint`. Dans ce cas, la KCL suppose que les enregistrements ont été traités seulement jusqu'à cet enregistrement.

Dans l'exemple, la méthode privée `Checkpoint(Checkpointer checkpointer)` montre comment appeler la méthode `Checkpointer.Checkpoint` en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL pour .NET gère les exceptions différemment des autres bibliothèques de langage KCL, car elle ne gère pas toutes les exceptions générées par le traitement des enregistrements de données. Toutes les exceptions non interceptées dans le code utilisateur bloquent le programme.

**Fermeture**  
La KCL appelle la méthode `Shutdown` soit à la fin du traitement (le motif de fermeture étant `TERMINATE`) ou lorsque l'application de travail ne répond plus (la raison de fermeture `input.Reason` ayant la valeur `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également un objet `Checkpointer` à `shutdown`. Si le motif de fermeture est `TERMINATE`, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode `checkpoint` sur cette interface.

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-dotnet"></a>

L'exemple d'application consommateur fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir `SampleConsumer/kcl.properties`).

### Application name (Nom de l'application)
<a name="modify-kinesis-record-processor-application-name"></a>

La KCL nécessite une d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-creds-dotnet"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Le fichier [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la [chaîne des fournisseurs d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

Dans l'exemple, le fichier de propriétés configure la KCL pour traiter un flux de données Kinesis appelé « words » à l'aide du processeur d'enregistrements fourni dans `AmazonKinesisSampleConsumer.cs`. 

# Développez un client de bibliothèque cliente Kinesis en Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Python.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Python et que vous écrivez votre application grand public entièrement en Python, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger la KCL Python depuis GitHub, accédez à la bibliothèque [cliente Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Pour télécharger un exemple de code pour une application client KCL Python, rendez-vous sur la page d'exemple de [projet KCL pour Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Python :

**Topics**
+ [Implémenter les méthodes RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-py)

## Implémenter les méthodes RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` doit étendre la classe `RecordProcessorBase` pour implémenter les méthodes ci-après. L'exemple fournit des implémentations que vous pouvez utiliser comme point de départ (voir `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialisation**  
La KCL appelle la méthode `initialize` lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique comme paramètre. Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique *au moins une fois*, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 La KCL appelle cette méthode en passant une liste d'enregistrements de données issus de la partition spécifiée par la méthode `initialize`. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. Le dictionnaire `record` expose les paires clé-valeur suivantes pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Notez que les données sont encodées en Base64.

Dans l'exemple, la méthode `process_records` contient du code qui montre comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet `Checkpointer` à `process_records`. Le processeur d'enregistrements appelle la méthode `checkpoint` sur cet objet pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de `checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `checkpoint` seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `checkpoint` à chaque appel de `process_records`. Un processeur peut, par exemple, appeler `checkpoint` tous les trois appels. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `checkpoint`. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

Dans l'exemple, la méthode privée `checkpoint` montre comment appeler la méthode `Checkpointer.checkpoint` en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL s'appuie sur `process_records` pour gérer toutes les exceptions générées par le traitement des enregistrements de données. Si une exception est déclenchée depuis `process_records`, la KCL ignore les enregistrements de données qui ont été transmis à `process_records` avant l'exception. En d'autres termes, ces enregistrements ne sont pas renvoyés au processeur d'enregistrements qui a lancé l'exception ou à tout autre processeur d'enregistrement dans l'application consommateur.

**shutdown**  
 La KCL appelle la méthode `shutdown` soit à la fin du traitement (le motif de fermeture étant `TERMINATE`) ou lorsque l'application de travail ne répond plus (la raison de fermeture `reason` ayant la valeur `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

 La KCL passe également un objet `Checkpointer` à `shutdown`. Si le motif de fermeture `reason` est `TERMINATE`, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode `checkpoint` sur cette interface.

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-py"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir `sample.properties`).

### Application name (Nom de l'application)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL nécessite un nom d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution qui sont associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-creds-py"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Le fichier [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la [chaîne des fournisseurs d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

Dans l'exemple, le fichier de propriétés configure la KCL pour traiter un flux de données Kinesis appelé « words » à l'aide du processeur d'enregistrements fourni dans `sample_kclpy_app.py`. 

# Développez un client de bibliothèque cliente Kinesis dans Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Ruby.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Ruby et que vous écrivez votre application grand public entièrement en Ruby, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger la KCL Ruby depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-ruby)Ruby). Pour télécharger un exemple de code pour une application grand public Ruby KCL, rendez-vous sur la page d'[exemple de projet KCL for Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) sur. GitHub

Pour plus d'informations sur la bibliothèque d'assistance KCL Ruby, consultez la [documentation KCL Ruby Gems](http://www.rubydoc.info/gems/aws-kclrb).

# Développez des consommateurs de KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Cette rubrique montre comment utiliser la version 2.0 de la bibliothèque client Kinesis (KCL). 

Pour plus d'informations sur la KCL, consultez la présentation fournie dans [Développement d'applications consommateur à l'aide de la bibliothèque client Kinesis 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html) (français non garanti).

Choisissez l'une des rubriques suivantes en fonction de l'option que vous souhaitez utiliser.

**Topics**
+ [Développez un client de bibliothèque cliente Kinesis en Java](kcl2-standard-consumer-java-example.md)
+ [Développez un client de bibliothèque cliente Kinesis en Python](kcl2-standard-consumer-python-example.md)
+ [Développez de nouveaux fans avec KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Développez un client de bibliothèque cliente Kinesis en Java
<a name="kcl2-standard-consumer-java-example"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Le code suivant présente un exemple d'implémentation dans Java de `ProcessorFactory` et `RecordProcessor`. Si vous souhaitez tirer parti de la fonctionnalité de déploiement amélioré, consultez [Utilisation d'applications consommateur avec le déploiement amélioré ](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Développez un client de bibliothèque cliente Kinesis en Python
<a name="kcl2-standard-consumer-python-example"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Python.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Python et que vous écrivez votre application grand public entièrement en Python, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger la KCL Python depuis GitHub, accédez à la bibliothèque [cliente Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Pour télécharger un exemple de code pour une application client KCL Python, rendez-vous sur la page d'exemple de [projet KCL pour Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Python :

**Topics**
+ [Implémenter les méthodes RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-py)

## Implémenter les méthodes RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` doit étendre la classe `RecordProcessorBase` pour implémenter les méthodes ci-après :

```
initialize
process_records
shutdown_requested
```

Cet exemple fournit des implémentations que vous pouvez utiliser comme point de départ.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-py"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration, comme illustré dans le script suivant. Vous pouvez remplacer ces propriétés par vos propres valeurs.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name (Nom de l'application)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL nécessite un nom d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution qui sont associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Informations d’identification
<a name="kinesis-record-processor-creds-py"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs [d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. AWS les informations d'identification qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

# Développez de nouveaux fans avec KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Les applications consommateur utilisant la *diffusion améliorée* dans Amazon Kinesis Data Streams peuvent recevoir des enregistrements provenant d'un flux de données avec un débit dédié de jusqu'à 2 Mo de données par seconde par partition. Ce type d'application consommateur n'a pas besoin de se heurter à d'autres applications consommateur qui reçoivent des données à partir du flux. Pour de plus amples informations, veuillez consulter [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md).

Vous pouvez utiliser la version 2.0 ou ultérieure de la bibliothèque client Kinesis (KCL) pour développer des applications qui utilisent la diffusion améliorée afin de recevoir des données provenant de flux. La KCL abonne automatiquement votre application à toutes les partitions d'un flux et garantit que votre application client peut lire avec une valeur de débit de 2 par partition. MB/sec Si vous souhaitez utiliser la KCL sans activer le déploiement amélioré, consultez [Développement d'applications consommateur à l'aide de la bibliothèque client Kinesis 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Développez des consommateurs de fans améliorés à l'aide de KCL 2.x en Java](building-enhanced-consumers-kcl-java.md)

# Développez des consommateurs de fans améliorés à l'aide de KCL 2.x en Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la version 2.0 ou ultérieure de la bibliothèque client Kinesis (KCL) pour développer des applications dans Amazon Kinesis Data Streams qui utilisent la diffusion améliorée afin de recevoir des données à partir de flux. Le code suivant présente un exemple d'implémentation dans Java de `ProcessorFactory` et `RecordProcessor`.

Il est recommandé d'utiliser `KinesisClientUtil` pour créer `KinesisAsyncClient` et configurer `maxConcurrency` dans `KinesisAsyncClient`.

**Important**  
Le client Amazon Kinesis peut voir une augmentation significative de la latence, sauf si vous configurez `KinesisAsyncClient` pour avoir une`maxConcurrency` suffisamment élevée pour autoriser tous les baux et les utilisations supplémentaires de `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```