

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Implementar o consumidor
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

O aplicativo consumidor neste tutorial processa continuamente as transações de ações em seu fluxo de dados. Em seguida, ele produz as ações mais populares compradas e vendidas a cada minuto. A aplicação é compilada com base na Kinesis Client Library (KCL), que faz grande parte do trabalho pesado comum às aplicações de consumo. Para obter mais informações, consulte [Informações da KCL 1.x e 2.x](shared-throughput-kcl-consumers.md). 

Consulte o código-fonte e analise as informações a seguir.

**StockTradesProcessor classe**  
A principal classe do consumidor fornecida e que executa as seguintes tarefas:  
+ Lê o aplicativo, o fluxo de dados e os nomes de região passados como argumentos.
+ Cria uma instância de `KinesisAsyncClient` com o nome da região.
+ Cria uma instância de `StockTradeRecordProcessorFactory` que veicula instâncias de `ShardRecordProcessor`, implementadas por uma instância de `StockTradeRecordProcessor`. 
+ Cria uma instância de `ConfigsBuilder` com a instância de `KinesisAsyncClient`, `StreamName`, `ApplicationName` e `StockTradeRecordProcessorFactory`. Isso é útil para criar todas as configurações com valores padrão.
+ Cria um agendador da KCL (anteriormente, nas versões 1.x da KCL, era conhecido como o operador da KCL) com a instância de `ConfigsBuilder`. 
+ O agendador cria uma nova thread para cada fragmento (atribuído a essa instância de consumidor), que faz loop continuamente para ler registros do fluxo de dados. Em seguida, ele invoca a instância de `StockTradeRecordProcessor` para processar cada lote de registros recebidos. 

**StockTradeRecordProcessor classe**  
Implementação da instância de `StockTradeRecordProcessor`, que, por sua vez, implementa cinco métodos necessários: `initialize`, `processRecords`, `leaseLost`, `shardEnded` e `shutdownRequested`.   
Os métodos `initialize` e `shutdownRequested` são usados pela KCL para permitir que o processador de registros saiba quando ele deve estar pronto para começar a receber registros e quando ele deve esperar parar de receber registros, respectivamente, para que ele possa executar qualquer configuração específica do aplicativo e tarefas de encerramento. `leaseLost` e `shardEnded` são usados para implementar qualquer lógica para o que fazer quando um contrato de aluguel é perdido ou um processamento chegou ao fim de um fragmento. Neste exemplo, simplesmente registramos em log mensagens indicando esses eventos.   
O código para esses métodos é fornecido para você. O processamento principal ocorre no método `processRecords`, que, por sua vez, usa `processRecord` para cada registro. Esse último método é fornecido como o código esqueleto quase todo vazio, para que seja implementado na próxima etapa, onde é explicado em mais detalhes.   
Observe também a implementação dos métodos de suporte de `processRecord`: `reportStats` e `resetStats`, que estão vazios no código-fonte original.   
O método `processRecords`, implementado previamente, executa as seguintes etapas:  
+ Para cada registro passado, ele chama `processRecord`. 
+ Se pelo menos 1 minuto houver decorrido após o último relatório, chamará `reportStats()`, que imprime as estatísticas mais recentes e, em seguida, `resetStats()`, que limpa as estatísticas para que o próximo intervalo inclua apenas registros novos.
+ Define o próximo horário para geração de relatórios.
+ Se houver decorrido pelo menos 1 minuto após o último ponto de verificação, chamará `checkpoint()`. 
+ Define o próximo horário do ponto de verificação.
Este método usa intervalos de 60 segundos como taxa de geração de relatórios e definição de pontos de verificação. Para obter mais informações sobre pontos de verificação, consulte [Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats classe**  
Essa classe fornece retenção de dados e rastreamento de estatísticas em relação às ações mais populares ao longo do tempo. Esse código é fornecido e contém os seguintes métodos:  
+ `addStockTrade(StockTrade)`: injeta o `StockTrade` conhecido nas estatísticas correntes.
+ `toString()`: retorna as estatísticas em uma string formatada.
Essa classe rastreia as ações mais populares mantendo uma contagem corrente do número total de negociações de cada ação e a contagem máxima. Ela atualiza essas contagens sempre que chega uma negociação de ação.

Adicione código aos métodos da classe `StockTradeRecordProcessor`, como mostrado nas etapas a seguir. 

**Como implementar o consumidor**

1. Implemente o método `processRecord` instanciando um objeto `StockTrade` de tamanho correto e adicionando a ele os dados do registro, registrando um aviso caso ocorra problema. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implemente um método `reportStats`. Modifique o formato de saída para se adequar às suas preferências. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implemente o método `resetStats`, que cria uma nova instância de `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implemente os seguintes métodos exigidos pela interface `ShardRecordProcessor`:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Como executar o consumidor**

1. Execute a aplicação de produção escrita em [[Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md) para injetar registros de negociações de ações no fluxo.

1. Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo `~/.aws/credentials`. 

1. Execute a classe `StockTradesProcessor` com os seguintes argumentos:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Observe que, ao criar o fluxo em uma região diferente de `us-west-2`, é necessário especificar essa região aqui.

Depois de um minuto, deverá aparecer uma saída como a seguir, atualizada a cada minuto a partir de então:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Opcional) Estender o consumidor](tutorial-stock-data-kplkcl2-consumer-extension.md)