

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Desenvolver consumidores personalizados com throughput compartilhada
<a name="shared-throughput-consumers"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Caso não seja necessário um throughput específico ao receber dados do Kinesis Data Streams, nem atrasos de propagação de leitura de até 200 ms, pode-se criar aplicações de consumo seguindo as etapas descritas nos tópicos a seguir. É possível usar a Kinesis Client Library (KCL) ou o AWS SDK para Java.

**Topics**
+ [Desenvolver aplicações de consumo personalizadas com throughput compartilhada usando a KCL](custom-kcl-consumers.md)

Para obter informações sobre a criação de consumidores que podem receber registros de fluxos de dados do Kinesis com throughput dedicada, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md).

# Desenvolver aplicações de consumo personalizadas com throughput compartilhada usando a KCL
<a name="custom-kcl-consumers"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Um dos métodos de desenvolvimento de aplicações de consumo personalizadas com throughput compartilhada envolve o uso da Kinesis Client Library (KCL). 

Escolha um dos tópicos a seguir para a versão KCL que esteja sendo usada.

**Topics**
+ [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md)
+ [Desenvolver aplicações de consumo da KCL 2.x](developing-consumers-with-kcl-v2.md)

# Desenvolver aplicações de consumo da KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível criar uma aplicação de consumo para o Amazon Kinesis Data Streams usando a Kinesis Client Library (KCL). 

Para obter mais informações sobre o KCL, consulte [Sobre a KCL (versões anteriores)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Escolha um dos seguintes tópicos, dependendo do que deseja usar.

**Topics**
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Java](kinesis-record-processor-implementation-app-java.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Python](kinesis-record-processor-implementation-app-py.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Java. Para ver a referência de Javadoc, consulte o tópico [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) para Classe. AmazonKinesisClient

Para baixar o Java KCL de GitHub, acesse a [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client)Java). Para localizar a KCL Java no Apache Maven, acesse a página de [resultados da pesquisa de KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Para baixar o código de amostra para um aplicativo consumidor Java KCL em GitHub, acesse a página do [projeto de amostra KCL for Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) em. GitHub 

O aplicativo de exemplo usa [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). É possível alterar a configuração do registro em log no método `configure` estático definido no arquivo `AmazonKinesisApplicationSample.java`. *Para obter mais informações sobre como usar o Apache Commons Logging com aplicativos Log4j e AWS Java, consulte [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) no Guia do desenvolvedor.AWS SDK para Java *

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Java:

**Topics**
+ [Implemente os métodos IRecord do processador](#kinesis-record-processor-implementation-interface-java)
+ [Implemente uma fábrica de classes para a interface IRecord do processador](#kinesis-record-processor-implementation-factory-java)
+ [Criar um operador](#kcl-java-worker)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-java)
+ [Migrar para a versão 2 da interface do processador de registros](#kcl-java-v2-migration)

## Implemente os métodos IRecord do processador
<a name="kinesis-record-processor-implementation-interface-java"></a>

Atualmente, a KCL oferece suporte a duas versões da interface do `IRecordProcessor`: a interface original está disponível com a primeira versão da KCL e a versão 2 está disponível desde a versão 1.5.0. As duas interfaces são totalmente compatíveis. A escolha depende dos requisitos de cenário específicos. Consulte os Javadocs criados localmente ou o código-fonte para ver todas as diferenças. As seções a seguir descrevem a implementação mínima para os conceitos básicos.

**Topics**
+ [Interface original (versão 1)](#kcl-java-interface-original)
+ [Interface atualizada (versão 2)](#kcl-java-interface-v2)

### Interface original (versão 1)
<a name="kcl-java-interface-original"></a>

A interface `IRecordProcessor` original (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar. O exemplo fornece implementações que podem ser usadas como ponto de partida (consulte `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**inicializar**  
A KCL chama o método `initialize` quando o processador de registros é instanciado, passando um ID de fragmento específico como um parâmetro. Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. A semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
A KCL chama o método `processRecords` passando uma lista de registros de dados do fragmento especificado pelo método `initialize(shardId)`. O processador de registros processa os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe `Record` expõe os seguintes métodos que oferecem acesso aos dados do registro, número de sequência e chave de partição. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

No exemplo, o método privado `processRecordsWithRetries` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento passando um checkpointer (`IRecordProcessorCheckpointer`) para o `processRecords`. O processador de registros chama o método `checkpoint` nesta interface para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se nenhum parâmetro for fornecido, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `processRecords`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada para `processRecords`. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

No exemplo, o método privado `checkpoint` mostra como chamar `IRecordProcessorCheckpointer.checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

A KCL depende do `processRecords` para lidar com qualquer exceção ocorrida no processamento dos registros de dados. Se ocorrer uma exceção em `processRecords`, a KCL ignorará os registros de dados passados antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros na aplicação de consumo.

**shutdown**  
A KCL chama o método `shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o motivo do desligamento é `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa uma interface do `IRecordProcessorCheckpointer` para `shutdown`. Se o motivo do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

### Interface atualizada (versão 2)
<a name="kcl-java-interface-v2"></a>

A interface `IRecordProcessor` atualizada (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Todos os argumentos da versão original da interface podem ser acessados por meio de métodos get nos objetos de contêiner. Por exemplo, para recuperar a lista de registros em `processRecords()`, pode-se usar `processRecordsInput.getRecords()`.

Além das entradas fornecidas pela interface original, estas novas entradas estão disponíveis a partir da versão 2 da interface (KCL 1.5.0 e posterior):

número de sequência inicial  
No objeto `InitializationInput` passado para a operação `initialize()`, o número de sequência inicial a partir do qual os registros seriam fornecidos à instância do processador de registros. Esse é o número de sequência que foi verificado pela última vez pela instância do processador de registros que processou anteriormente o mesmo fragmento. Isso será fornecido no caso de o aplicativo precisar de informações. 

número de sequência do ponto de verificação pendente  
No objeto `InitializationInput` passado para a operação `initialize()`, o número de sequência de verificação pendente (se houver) que não pôde ser confirmado antes que a instância do processador de registros anterior parasse.

## Implemente uma fábrica de classes para a interface IRecord do processador
<a name="kinesis-record-processor-implementation-factory-java"></a>

Também será necessário implementar uma fábrica para a classe que implementa os métodos do processador de registros. Quando a aplicação de consumo instancia o operador, ela passa uma referência a essa fábrica.

O exemplo implementa a classe de fábrica no arquivo `AmazonKinesisApplicationSampleRecordProcessorFactory.java` usando a interface de processador de registros original. Para que a fábrica da classe crie a versão 2 dos processadores de registros, deve-se usar o nome do pacote `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Criar um operador
<a name="kcl-java-worker"></a>

Como discutido em [Implemente os métodos IRecord do processador](#kinesis-record-processor-implementation-interface-java), há duas versões da interface do processador de registros da KCL para escolha, o que afeta a forma de criar um operador. A interface do processador de registros original usa a seguinte estrutura de código para criar um operador:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Com a versão 2 da interface do processador de registros, é possível usar `Worker.Builder` para criar um operador sem a necessidade de se preocupar com qual construtor usar e a ordem dos argumentos. A interface do processador de registros atualizada usa a seguinte estrutura de código para criar um operador:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-java"></a>

O exemplo fornece valores padrão para propriedades de configuração. Esses dados de configuração para o operador são então consolidados em um objeto `KinesisClientLibConfiguration`. Esse objeto e uma referência à fábrica de classe para `IRecordProcessor` são passados na chamada que instancia o operador. É possível substituir qualquer uma dessas propriedades por seus próprios valores usando um arquivo de propriedades do Java (consulte `AmazonKinesisApplicationSample.java`).

### Nome da aplicação
<a name="configuration-property-application-name"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-cred-java"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Por exemplo, ao executar a aplicação de consumo em uma instância do EC2, recomenda-se que a instância seja executada com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

A aplicação de exemplo primeiro tenta recuperar as credenciais do IAM nos metadados da instância: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Se a aplicação de exemplo não consegue obter credenciais dos metadados da instância, ele tenta recuperar as credenciais de um arquivo de propriedades:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Para obter mais informações sobre os metadados da instância, consulte [Metadados da instância](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) no *Guia do usuário do Amazon EC2*.

### Usar o ID do operador para várias instâncias
<a name="kinesis-record-processor-workerid-java"></a>

O código de inicialização de exemplo cria um ID para o operador, `workerId`, usando o nome do computador local e anexando um identificador exclusivo globalmente, conforme mostrado no seguinte trecho de código. Essa abordagem é compatível com o cenário de várias instâncias da aplicação de consumo em execução em um único computador.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrar para a versão 2 da interface do processador de registros
<a name="kcl-java-v2-migration"></a>

Para migrar o código que usa a interface original, além das etapas descritas anteriormente, as seguintes etapas serão necessárias:

1. Altere a classe do processador de registros para importar a versão 2 da interface do processador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Altere as referências para as entradas para usar métodos `get` nos objetos de contêiner. Por exemplo, na operação `shutdown()`, altere "`checkpointer`" para "`shutdownInput.getCheckpointer()`".

1. Altere a classe da fábrica do processador de registros para importar a versão 2 da interface da fábrica do processador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Altere a construção do operador para usar `Worker.Builder`. Por exemplo:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Node.js.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Node.js e escrever seu aplicativo de consumidor inteiramente em Node.js, ainda precisará do Java instalado em seu sistema por causa do MultiLangDaemon. Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar a KCL do Node.js GitHub, acesse a [Biblioteca de Cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-nodejs)Node.js).

**Downloads de códigos de exemplo**

Há dois exemplos de código disponíveis para KCL em Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Usado nas seções a seguir para ilustrar os conceitos básicos de criação de uma aplicação de consumo da KCL em Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Levemente mais avançado e usa um cenário real, para depois que houver familiaridade com o código de exemplo básico. Esse exemplo não é discutido aqui, mas há um arquivo README com mais informações.

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Node.js:

**Topics**
+ [Implementar o processador de registros](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-nodejs)

## Implementar o processador de registros
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

A aplicação de consumo mais simples possível usando a KCL para Node.js deve implementar uma função `recordProcessor`, que, por sua vez, contém as funções `initialize`, `processRecords` e `shutdown`. O exemplo fornece uma implementação que pode ser usada como ponto de partida (consulte `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**inicializar**  
A KCL chama a função `initialize` quando o processador de registros é iniciado. Esse processador de registros processa apenas o ID do fragmento passado como `initializeInput.shardId` e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 A KCL chama essa função com uma entrada contendo uma lista de registros de dados do fragmento especificado para a função `initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição, que o operador pode usar ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário de `record` expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

```
record.data
record.sequenceNumber
record.partitionKey
```

Observe que os dados são codificados em Base64.

No exemplo básico, a função `processRecords` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento com um objeto `checkpointer` passado como `processRecordsInput.checkpointer`. O processador de registros chama a função `checkpointer.checkpoint` para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações ao reiniciar o processamento do fragmento para continuar a partir do último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se u mnúmero de sequência não for passado para a função `checkpoint`, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` **somente** após ter processado todos os registros na lista que foi passada para ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `processRecords`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada ou algum evento externo ao seu processador de gravação, como um verification/validation serviço personalizado que você implementou. 

É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

O aplicativo de exemplo básico mostra a chamada mais simples possível para a função `checkpointer.checkpoint`. É possível adicionar outra lógica de verificação que precisar para o consumidor neste ponto da função.

**shutdown**  
A KCL chama a função `shutdown` quando o processamento termina (`shutdownInput.reason` é `TERMINATE`) ou quando o operador não está mais respondendo (`shutdownInput.reason` é `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa um objeto `shutdownInput.checkpointer` para `shutdown`. Se o motivo do desligamento for `TERMINATE`, é necessário verificar se o processador de registros terminou o processamento de todos os registros de dados e, em seguida, chamar a função `checkpoint` nessa interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-nodejs"></a>

O exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `sample.properties` no exemplo básico).

### Nome da aplicação
<a name="kinesis-record-processor-application-name-nodejs"></a>

A KCL exige uma aplicação exclusiva entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-credentials-nodejs"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. O arquivo `sample.properties` precisa disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se você estiver executando seu consumidor em uma instância do Amazon EC2, recomendamos que você configure a instância com uma função do IAM. AWS as credenciais que refletem as permissões associadas a essa função do IAM são disponibilizadas aos aplicativos na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O exemplo a seguir configura a KCL para processar um fluxo de dados do Kinesis chamado `kclnodejssample` usando o processador de registros fornecido em `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Desenvolver uma aplicação de consumo da Kinesis Client Library em .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute .NET.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para.NET e escrever seu aplicativo de consumidor inteiramente no.NET, ainda precisará do Java instalado em seu sistema por causa do MultiLangDaemon. Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o.NET KCL de GitHub, acesse [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Para baixar o código de amostra para um aplicativo de consumidor do.NET KCL, acesse a página do [projeto de amostra de consumidor KCL para.NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em .NET:

**Topics**
+ [Implemente os métodos da classe IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-dotnet)

## Implemente os métodos da classe IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

A aplicação de consumo precisa implementar os seguintes métodos para `IRecordProcessor`. A aplicação de consumo de exemplo fornece implementações que podem ser usadas como ponto de partida (consulte a classe `SampleRecordProcessor` em `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Inicializar**  
A KCL chama este método quando o processador de registros é instanciado, passando um ID de fragmento específico no parâmetro `input` (`input.ShardId`). Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
A KCL chama este método passando uma lista de registros de dados no parâmetro `input` (`input.Records`) do fragmento especificado pelo método `Initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe `Record` expõe os seguintes itens para acessar os dados do registro, o número de sequência e a chave de partição:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

No exemplo, o método `ProcessRecordsWithRetries` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento, passando um objeto `Checkpointer` para `ProcessRecords` (`input.Checkpointer`). O processador de registros chama o método `Checkpointer.Checkpoint` para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `Checkpointer.Checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se nenhum parâmetro for fornecido, a KCL presumirá que a chamada para `Checkpointer.Checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `Checkpointer.Checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `Checkpointer.Checkpoint` em cada chamada para `ProcessRecords`. Um processador pode, por exemplo, chamar `Checkpointer.Checkpoint` a cada terceira ou quarta chamada. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `Checkpointer.Checkpoint`. Nesse caso, a KCL presume que os registros foram processados somente até o registro especificado.

No exemplo, o método privado `Checkpoint(Checkpointer checkpointer)` mostra como chamar o método `Checkpointer.Checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

De maneira diferente das bibliotecas KCL em outras linguagens, a KCL para .NET não lida com nenhuma exceção ocorrida no processamento dos registros de dados. As exceções não detectadas do código do usuário causam uma falha no programa.

**Shutdown**  
A KCL chama o método `Shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o valor `input.Reason` do desligamento é `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa um objeto `Checkpointer` para `shutdown`. Se o motivo do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-dotnet"></a>

A aplicação de consumo de exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `SampleConsumer/kcl.properties`).

### Nome da aplicação
<a name="modify-kinesis-record-processor-application-name"></a>

A KCL exige uma aplicação exclusiva entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-creds-dotnet"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. As [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) precisam disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Ao executar a aplicação de consumo em uma instância do EC2, recomenda-se configurar a instância com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O arquivo de propriedades do exemplo configura a KCL para processar um fluxo de dados do Kinesis chamado “words” usando o processador de registros fornecido em `AmazonKinesisSampleConsumer.cs`. 

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Python.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o Python KCL em GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Para baixar o código de amostra para um aplicativo de consumidor do Python KCL, acesse a página do projeto de amostra do [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Python:

**Topics**
+ [Implemente os métodos RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-py)

## Implemente os métodos RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

A classe `RecordProcess` precisa estender o `RecordProcessorBase` para implementar os métodos a seguir. O exemplo fornece implementações que podem ser usadas como ponto de partida (consulte `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**inicializar**  
A KCL chama o método `initialize` quando o processador de registros é instanciado, passando um ID de fragmento específico como um parâmetro. Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 A KCL chama este método passando uma lista de registros de dados do fragmento especificado pelo método `initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário de `record` expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Observe que os dados são codificados em Base64.

No exemplo, o método `process_records` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento, passando um objeto `Checkpointer` para `process_records`. O processador de registros chama o método `checkpoint` neste objeto para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se você não passar um parâmetro, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `process_records`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

No exemplo, o método privado `checkpoint` mostra como chamar o método `Checkpointer.checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

A KCL depende do `process_records` para lidar com qualquer exceção ocorrida no processamento dos registros de dados. Se ocorrer uma exceção em `process_records`, a KCL ignorará os registros de dados passados para `process_records` antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros na aplicação de consumo.

**shutdown**  
 A KCL chama o método `shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o `reason` do desligamento é `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

 A KCL também passa um objeto `Checkpointer` para `shutdown`. Se o `reason` do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-py"></a>

O exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `sample.properties`).

### Nome da aplicação
<a name="kinesis-record-processor-application-name-py"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados a esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Se você executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-creds-py"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. As [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) precisam disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Ao executar a aplicação de consumo em uma instância do Amazon EC2, recomenda-se que a instância seja configurada com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O arquivo de propriedades do exemplo configura a KCL para processar um fluxo de dados do Kinesis chamado “words” usando o processador de registros fornecido em `sample_kclpy_app.py`. 

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Ruby.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Ruby e escrever seu aplicativo de consumo inteiramente em Ruby, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar a KCL do Ruby GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-ruby) (Ruby). Para baixar o código de amostra para um aplicativo de consumidor Ruby KCL, acesse a página do projeto de [amostra KCL for Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) em. GitHub

Para obter mais informações sobre a biblioteca de suporte da KCL Ruby, consulte a [documentação da KCL para gems da Ruby](http://www.rubydoc.info/gems/aws-kclrb).

# Desenvolver aplicações de consumo da KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Este tópico mostra como usar a versão 2.0 da Kinesis Client Library (KCL). 

Para obter mais informações sobre a KCL, consulte a visão geral fornecida em [Developing Consumers Using the Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Escolha um dos seguintes tópicos, dependendo do que deseja usar.

**Topics**
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Java](kcl2-standard-consumer-java-example.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Python](kcl2-standard-consumer-python-example.md)
+ [Desenvolver consumidores de distribuição avançada com o KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Java
<a name="kcl2-standard-consumer-java-example"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

O código a seguir mostra uma implementação de exemplo em Java de `ProcessorFactory` e `RecordProcessor`. Para aproveitar o recurso de distribuição avançada, consulte [Usar consumidores com distribuição avançada](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Python
<a name="kcl2-standard-consumer-python-example"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Python.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o Python KCL em GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Para baixar o código de amostra para um aplicativo de consumidor do Python KCL, acesse a página do projeto de amostra do [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Python:

**Topics**
+ [Implemente os métodos RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-py)

## Implemente os métodos RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

A classe `RecordProcess` precisa estender a classe `RecordProcessorBase` para implementar os seguintes métodos:

```
initialize
process_records
shutdown_requested
```

Este exemplo fornece implementações que podem ser usadas como ponto de partida.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-py"></a>

O exemplo fornece valores padrão para as propriedades de configuração, conforme mostrado no script a seguir. É possível substituir qualquer uma dessas propriedades por seus próprios valores.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Nome da aplicação
<a name="kinesis-record-processor-application-name-py"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados a esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos entre várias instâncias. Se você executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credenciais
<a name="kinesis-record-processor-creds-py"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de [credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. Se você executar seu aplicativo de consumidor em uma instância do Amazon EC2, recomendamos que você configure a instância com uma função do IAM. AWS as credenciais que refletem as permissões associadas a essa função do IAM são disponibilizadas aos aplicativos na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

# Desenvolver consumidores de distribuição avançada com o KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Os consumidores que usam a *distribuição avançada* no Amazon Kinesis Data Streams podem receber registros de um fluxo de dados com throughput dedicada de até 2 MB de dados por segundo por fragmento. Esse tipo de consumidor não precisa lidar com outros consumidores que estejam recebendo dados do fluxo. Para obter mais informações, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md).

É possível usar a versão 2.0 ou posterior da Kinesis Client Library (KCL) para desenvolver aplicações que usam a distribuição avançada para receber dados de fluxos. A KCL inscreve automaticamente seu aplicativo em todos os fragmentos de um stream e garante que seu aplicativo consumidor possa ler com um valor de taxa de transferência de 2 por fragmento. MB/sec Para usar a KCL sem ativar a distribuição avançada, consulte [Desenvolvendo aplicações de consumo usando a Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Desenvolver consumidores de distribuição avançada usando o KCL 2.x em Java](building-enhanced-consumers-kcl-java.md)

# Desenvolver consumidores de distribuição avançada usando o KCL 2.x em Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a versão 2.0 ou posterior da Kinesis Client Library (KCL) para desenvolver aplicações no Amazon Kinesis Data Streams que recebem dados de fluxos usando a distribuição avançada. O código a seguir mostra uma implementação de exemplo em Java de `ProcessorFactory` e `RecordProcessor`.

É recomendável o uso de `KinesisClientUtil` para criar `KinesisAsyncClient` e configurar `maxConcurrency` no `KinesisAsyncClient`.

**Importante**  
O Amazon Kinesis Client pode ter latência significativamente maior, a menos que se configure `KinesisAsyncClient` para ter um `maxConcurrency` alto o suficiente para permitir todas as concessões e usos adicionais do `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```