

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Tutoriais de conceitos básicos do Amazon Kinesis Data Streams
<a name="examples"></a>

O Amazon Kinesis Data Streams fornece várias soluções diferentes para ingerir e consumir dados dos fluxos de dados do Kinesis. Os tutoriais desta seção são projetados para ajudar na compreensão da funcionalidade e dos conceitos do Amazon Kinesis Data Streams e identificar a solução que atenda a cada necessidade. 

**Topics**
+ [

# Tutorial: Processar dados de ações em tempo real usando a KCL e a KCL 2.x
](tutorial-stock-data-kplkcl2.md)
+ [

# Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x
](tutorial-stock-data-kplkcl.md)
+ [

# Tutorial: analise dados do mercado de ações em tempo real usando o Amazon Managed Service for Apache Flink
](tutorial-stock-data.md)
+ [

# Tutorial: Use AWS Lambda com o Amazon Kinesis Data Streams
](tutorial-stock-data-lambda.md)
+ [

# Use a solução AWS de streaming de dados para o Amazon Kinesis
](examples-streaming-solution.md)

# Tutorial: Processar dados de ações em tempo real usando a KCL e a KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

O cenário deste tutorial envolve consumir negociações do mercado de ações em um fluxo de dados e criar uma aplicação básica do Amazon Kinesis Data Streams para realizar cálculos no fluxo. Será explicado como enviar um fluxo de registros para o Kinesis Data Streams e implementar uma aplicação que consome e processa os registros em tempo quase real.

**Importante**  
Depois de criar um stream, sua conta incorre em cobranças nominais pelo uso do Kinesis Data Streams porque o Kinesis Data Streams não está qualificado para o nível gratuito. AWS Depois de iniciada, a aplicação de consumo também incorre em cobranças nominais pelo uso do Amazon DynamoDB. A aplicação de consumo usa o DynamoDB para monitorar o estado do processamento. Ao terminar de usar esta aplicação, exclua seus recursos da AWS para parar de gerar cobranças. Para obter mais informações, consulte [Limpar os recursos](tutorial-stock-data-kplkcl2-finish.md).

O código não acessa os dados reais da bolsa de valores, ele simula o fluxo de negociações de ações. Isso é feito com o uso de um gerador de negociações de ações aleatórias cujo ponto de partida são dados do mercado real referente às 25 principais ações por capitalização de mercado em fevereiro de 2015. Se houver acesso a um fluxo de negociações de ações em tempo real, pode ser interessante derivar estatísticas úteis e em tempo hábil desse fluxo. Por exemplo, talvez convenha executar uma análise de janela deslizante na qual se determine a ação mais popular que foi adquirida nos últimos 5 minutos. Ou talvez convenha uma notificação sempre que uma ordem de venda for muito grande (ou seja, tenha muitas quotas). É possível estender o código nesta série para oferecer essa funcionalidade.

É possível executar as etapas deste tutorial no desktop ou laptop e executar o código de produtor e de consumidor na mesma máquina ou em qualquer plataforma que ofereça suporte aos requisitos definidos.

Os exemplos mostrados usam a região Oeste dos EUA (Oregon), mas funcionam em qualquer [região da AWS que oferece suporte ao Kinesis Data Streams](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Concluir os pré-requisitos
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Criar um fluxo de dados
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Criar um usuário e uma política do IAM
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Fazer download e criar o código
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implementar o produtor
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implementar o consumidor
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Opcional) Estender o consumidor
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Limpar os recursos
](tutorial-stock-data-kplkcl2-finish.md)

# Concluir os pré-requisitos
<a name="tutorial-stock-data-kplkcl2-begin"></a>

É necessário atender aos seguintes requisitos para concluir este tutorial:

## Criar e usar uma conta da Amazon Web Services
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Antes de começar, familiarize-se com os conceitos abordados em [Terminologia e conceitos do Amazon Kinesis Data Streams](key-concepts.md), especialmente com fluxos, fragmentos, produtores e consumidores. Também é útil concluir as etapas no seguinte guia: [Tutorial: Instalar e configurar o AWS CLI para Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Você deve ter uma AWS conta e um navegador da web para acessar Console de gerenciamento da AWS o.

Para acessar o console, use seu nome de usuário e senha do IAM para fazer login no [Console de gerenciamento da AWS](https://console.aws.amazon.com/console/home) da página de login do IAM. *Para obter informações sobre credenciais AWS de segurança, incluindo acesso programático e alternativas às credenciais de longo prazo, consulte as credenciais de [AWS segurança no Guia do usuário](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) do IAM.* Para obter detalhes sobre como fazer login no seu Conta da AWS, consulte [Como fazer login AWS no](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) *Guia do Início de Sessão da AWS usuário*.

Para obter mais informações sobre o IAM e instruções de configuração da chave de segurança, consulte [Criar um usuário do IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Preencher os requisitos de software do sistema
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

O sistema usado para executar o aplicativo deve ter o Java 7 ou posterior instalado. Para fazer download e instalar o Java Development Kit (JDK) mais recente, acesse o [Site de instalação do Java SE da Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

É necessário ter a versão mais recente do [AWS SDK para Java](https://aws.amazon.com/sdk-for-java/). 

[O aplicativo consumidor requer a Kinesis Client Library (KCL) versão 2.2.9 ou superior, que você pode obter em /tree/master. GitHub https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Criar um fluxo de dados](tutorial-stock-data-kplkcl2-create-stream.md)

# Criar um fluxo de dados
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

Primeiro, é necessário criar o fluxo de dados que será usado nas etapas seguintes deste tutorial.

**Para criar um fluxo**

1. [Faça login no Console de gerenciamento da AWS e abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Selecione **Fluxos de dados** no painel de navegação.

1. Na barra de navegação, expanda o seletor de região e escolha uma região.

1. Selecione **Criar fluxo do Kinesis**.

1. Insira um nome para seu fluxo de dados (por exemplo, **StockTradeStream**).

1. Digite **1** como o número de fragmentos, mas deixe **Estimar o número de fragmentos necessários** recolhido.

1. Selecione **Criar fluxo do Kinesis**.

Na página de lista **Fluxos do Kinesis**, o status do fluxo aparece como `CREATING` enquanto ele está sendo criado. Quando o fluxo fica pronto para uso, o status é alterado para `ACTIVE`. 

Se o nome do fluxo for escolhido, na página exibida, a guia **Detalhes** exibirá um resumo da configuração do fluxo de dados. A seção **Monitoramento** exibe informações de monitoramento do fluxo.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Criar um usuário e uma política do IAM](tutorial-stock-data-kplkcl2-iam.md)

# Criar um usuário e uma política do IAM
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Práticas recomendadas de segurança para AWS ditar o uso de permissões refinadas para controlar o acesso a diferentes recursos. AWS Identity and Access Management (IAM) permite gerenciar usuários e permissões de usuários no AWS. Uma [Política do IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) lista explicitamente as ações permitidas e os recursos aos quais as ações são aplicáveis.

Veja a seguir as permissões mínimas normalmente necessárias para produtores e consumidores do Kinesis Data Streams.


**Produtor**  

| Ações | Recurso | Finalidade | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Fluxo de dados do Kinesis | Antes de tentar ler registros, o consumidor verifica se o fluxo de dados existe, se está ativo e se os fragmentos estão contidos no fluxo de dados. | 
| SubscribeToShard, RegisterStreamConsumer | Fluxo de dados do Kinesis | Assina e registra os consumidores em um fragmento. | 
| PutRecord, PutRecords | Fluxo de dados do Kinesis | Grava registros no Kinesis Data Streams. | 


**Consumidor**  

| **Ações** | **Recurso** | **Finalidade** | 
| --- | --- | --- | 
| DescribeStream | Fluxo de dados do Kinesis | Antes de tentar ler registros, o consumidor verifica se o fluxo de dados existe, se está ativo e se os fragmentos estão contidos no fluxo de dados. | 
| GetRecords, GetShardIterator  | Fluxo de dados do Kinesis | Lê registros de um fragmento. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabela do Amazon DynamoDB | Se for desenvolvido usando a Kinesis Client Library (KCL) versão 1.x ou 2.x, o consumidor precisará de permissões para uma tabela do DynamoDB a fim de monitorar o estado de processamento da aplicação. | 
| DeleteItem | Tabela do Amazon DynamoDB | Para quando o consumidor realiza split/merge operações nos fragmentos do Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro da Amazon | O KCL também carrega métricas para CloudWatch, que são úteis para monitorar o aplicativo. | 

Neste tutorial, será criada uma única política do IAM que concede todas as permissões acima. Na produção, talvez convenha criar duas políticas, uma para produtores e outra para consumidores.

**Para criar uma política do IAM**

1. Localize o nome do recurso da Amazon (ARN) para o novo fluxo de dados criado na etapa anterior. Esse ARN pdoe ser encontrado listado como **ARN do fluxo** na parte superior da guia **Detalhes**. O formato do ARN é o seguinte:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
O código AWS da região; por exemplo,`us-west-2`. Para obter mais informações, consulte [Conceitos de região e zona de disponibilidade](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
O ID da AWS conta, conforme mostrado nas [Configurações da conta](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
O nome do fluxo de dados criado na etapa anterior, que é `StockTradeStream`.

1. Determine o ARN da tabela do DynamoDB a ser usada pelo consumidor (e a ser criado pela primeira instância de consumidor). Ele deve estar no seguinte formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   A região e o ID da conta são idênticos aos valores do ARN do fluxo de dados sendo usado neste tutorial, mas o *nome* é o nome da tabela do DynamoDB criada e usada pela aplicação de consumo. A KCL usa o nome do aplicativo como nome da tabela. Nesta etapa, use `StockTradesProcessor` como o nome da tabela do DynamoDB, pois esse é o nome da aplicação usada nas etapas posteriores do tutorial.

1. No console do IAM, em **Políticas** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), escolha **Create policy**. Se este for o primeiro contato com políticas do IAM, escolha **Conceitos básicos** e **Criar política**.

1. Escolha **Selecionar** ao lado de **Gerador de políticas**.

1. Escolha o **Amazon Kinesis** como serviço. AWS 

1. Selecione `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord`e `PutRecords` como ações permitidas.

1. Insira o ARN do fluxo de dados sendo usado neste tutorial.

1. Use **Adicionar instrução** para cada um dos seguintes:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   O asterisco (`*`) é usado quando não é necessário especificar um ARN. Nesse caso, é porque não há nenhum recurso específico CloudWatch no qual a `PutMetricData` ação seja invocada.

1. Escolha **Próxima etapa**.

1. Altere **Nome da política** para `StockTradeStreamPolicy`, revise o código e selecione **Criar política**.

O documento de política resultante deve ser semelhante a:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Para criar um usuário do IAM**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Na página **Usuários**, selecione **Adicionar usuário**.

1. Em **Nome de usuário**, digite `StockTradeStreamUser`.

1. Em **Tipo de acesso**, selecione **Aceso programático** e, em seguida, selecione **Próximo: permissões**.

1. Escolha **Anexar políticas existentes diretamente**.

1. Pesquise por nome a política criada no procedimento anterior (`StockTradeStreamPolicy`). Selecione a caixa à esquerda do nome da política e selecione **Próximo: revisão**.

1. Revise os detalhes e o resumo e, em seguida, selecione **Criar usuário**.

1. Copie o **ID da chave de acesso** e salve-o de forma privada. Em **Chave de acesso secreta**, selecione **Mostrar** e salve a chave de forma privada também.

1. Cole as chaves de acesso e chaves secretas em um arquivo local em lugar seguro de acesso restrito. Para esse aplicativo, crie um arquivo denominado ` ~/.aws/credentials` (com permissões restritas). O arquivo deverá estar no seguinte formato:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Para anexar uma política do IAM a um usuário**

1. No console do IAM, abra [Políticas](https://console.aws.amazon.com/iam/home?#policies) e selecione **Ações da política**. 

1. Selecione `StockTradeStreamPolicy` e **Anexar**.

1. Selecione `StockTradeStreamUser` e **Anexar política**.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Fazer download e criar o código](tutorial-stock-data-kplkcl2-download.md)

# Fazer download e criar o código
<a name="tutorial-stock-data-kplkcl2-download"></a>

Este tópico fornece um exemplo de código de implementação para o exemplo de ingestão de transações de ações no fluxo de dados (*produtor*) e o processamento desses dados (*consumidor*).

**Como fazer download e criar o código**

1. Baixe o código-fonte do [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repositório para o seu computador.

1. Crie um projeto no IDE com o código-fonte, respeitando a estrutura de diretório fornecida.

1. Adicione as seguintes bibliotecas ao projeto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Dependendo do seu IDE, o projeto pode ser compilado automaticamente. Se não, compile o projeto usando as etapas apropriadas para o seu IDE.

Se essas etapas foram concluídas com êxito, é possível agora ir para a próxima seção, [Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md). 

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md)

# Implementar o produtor
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Este tutorial usa o cenário do mundo real de monitoramento de transações da bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de suporte.

Consulte o [código-fonte](https://github.com/aws-samples/amazon-kinesis-learning ) e analise as informações a seguir.

**StockTrade classe**  
Uma negociação de ação individual é representada por uma instância da classe StockTrade. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é previamente implementada. 

**Registro de fluxo**  
Um fluxo é uma sequência de registros. Um registro é uma serialização de uma instância `StockTrade` no formato JSON. Por exemplo:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
StockTradeGenerator tem um método chamado `getRandomTrade()` que retorna uma nova negociação de ações gerada aleatoriamente toda vez que é invocada. Essa classe é previamente implementada.

**StockTradesWriter classe**  
O método `main` do produtor, StockTradesWriter, recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:  

1. Lê o nome do fluxo de dados e o nome da região como entrada.

1. Usa o `KinesisAsyncClientBuilder` para definir região, credenciais e configuração do cliente. 

1. Verifica se o stream existe e está ativo (se não, ele será encerrado com um erro). 

1. Em um loop contínuo, chama o método `StockTradeGenerator.getRandomTrade()` e o método `sendStockTrade` para enviar a negociação ao stream a cada 100 milissegundos. 
O método `sendStockTrade` da classe `StockTradesWriter` tem o seguinte código:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Consulte o desmembramento do código a seguir:
+ A API `PutRecord` espera uma matriz de bytes, e é necessário converter a transação para o formato JSON. Essa única linha de código executa a seguinte operação: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Antes de enviar a transação, crie uma nova instância de `PutRecordRequest` (chamada solicitação neste caso). Cada `request` exige o nome do fluxo, uma chave de partição e um blob de dados. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado fragmento. Na prática, deve haver centenas ou milhares de chaves de partição por fragmento, de forma que os registros sejam uniformemente disseminados no fluxo. Para obter mais informações sobre como adicionar dados a um fluxo, consulte [Gravar dados no Amazon Kinesis Data Streams](building-producers.md).

  Agora, `request` está pronto para enviar para o cliente (operação put): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Adicione o try/catch bloco ao redor da `put` operação: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Isso ocorre porque uma operação put do Kinesis Data Streams pode falhar devido a erro de rede ou porque o fluxo de dados pode atingir o limite de throughput e ficar limitado. É recomendado considerar cuidadosamente sua política de tentativa para operações `put` a fim de evitar perda de dados, por exemplo, usando como uma nova tentativa. 
+ O registro de status é útil mas opcional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams, `PutRecord`. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros de `PutRecords` e enviar lotes de registros por vez. Para obter mais informações, consulte [Gravar dados no Amazon Kinesis Data Streams](building-producers.md).

**Como executar o produtor**

1. Verifique se a chave de acesso e o par de chaves secretas recuperados em [Criar um usuário e uma política do IAM](tutorial-stock-data-kplkcl2-iam.md) estão salvos no arquivo `~/.aws/credentials`. 

1. Execute a classe `StockTradeWriter` com os seguintes argumentos:

   ```
   StockTradeStream us-west-2
   ```

   Se o fluxo foi criado em uma região diferente de `us-west-2`, será necessário especificar esta região.

A saída deve ser semelhante a:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Suas negociações de ações agora estão sendo ingeridas pelo Kinesis Data Streams.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementar o consumidor](tutorial-stock-data-kplkcl2-consumer.md)

# Implementar o consumidor
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

O aplicativo consumidor neste tutorial processa continuamente as transações de ações em seu fluxo de dados. Em seguida, ele produz as ações mais populares compradas e vendidas a cada minuto. A aplicação é compilada com base na Kinesis Client Library (KCL), que faz grande parte do trabalho pesado comum às aplicações de consumo. Para obter mais informações, consulte [Informações da KCL 1.x e 2.x](shared-throughput-kcl-consumers.md). 

Consulte o código-fonte e analise as informações a seguir.

**StockTradesProcessor classe**  
A principal classe do consumidor fornecida e que executa as seguintes tarefas:  
+ Lê o aplicativo, o fluxo de dados e os nomes de região passados como argumentos.
+ Cria uma instância de `KinesisAsyncClient` com o nome da região.
+ Cria uma instância de `StockTradeRecordProcessorFactory` que veicula instâncias de `ShardRecordProcessor`, implementadas por uma instância de `StockTradeRecordProcessor`. 
+ Cria uma instância de `ConfigsBuilder` com a instância de `KinesisAsyncClient`, `StreamName`, `ApplicationName` e `StockTradeRecordProcessorFactory`. Isso é útil para criar todas as configurações com valores padrão.
+ Cria um agendador da KCL (anteriormente, nas versões 1.x da KCL, era conhecido como o operador da KCL) com a instância de `ConfigsBuilder`. 
+ O agendador cria uma nova thread para cada fragmento (atribuído a essa instância de consumidor), que faz loop continuamente para ler registros do fluxo de dados. Em seguida, ele invoca a instância de `StockTradeRecordProcessor` para processar cada lote de registros recebidos. 

**StockTradeRecordProcessor classe**  
Implementação da instância de `StockTradeRecordProcessor`, que, por sua vez, implementa cinco métodos necessários: `initialize`, `processRecords`, `leaseLost`, `shardEnded` e `shutdownRequested`.   
Os métodos `initialize` e `shutdownRequested` são usados pela KCL para permitir que o processador de registros saiba quando ele deve estar pronto para começar a receber registros e quando ele deve esperar parar de receber registros, respectivamente, para que ele possa executar qualquer configuração específica do aplicativo e tarefas de encerramento. `leaseLost` e `shardEnded` são usados para implementar qualquer lógica para o que fazer quando um contrato de aluguel é perdido ou um processamento chegou ao fim de um fragmento. Neste exemplo, simplesmente registramos em log mensagens indicando esses eventos.   
O código para esses métodos é fornecido para você. O processamento principal ocorre no método `processRecords`, que, por sua vez, usa `processRecord` para cada registro. Esse último método é fornecido como o código esqueleto quase todo vazio, para que seja implementado na próxima etapa, onde é explicado em mais detalhes.   
Observe também a implementação dos métodos de suporte de `processRecord`: `reportStats` e `resetStats`, que estão vazios no código-fonte original.   
O método `processRecords`, implementado previamente, executa as seguintes etapas:  
+ Para cada registro passado, ele chama `processRecord`. 
+ Se pelo menos 1 minuto houver decorrido após o último relatório, chamará `reportStats()`, que imprime as estatísticas mais recentes e, em seguida, `resetStats()`, que limpa as estatísticas para que o próximo intervalo inclua apenas registros novos.
+ Define o próximo horário para geração de relatórios.
+ Se houver decorrido pelo menos 1 minuto após o último ponto de verificação, chamará `checkpoint()`. 
+ Define o próximo horário do ponto de verificação.
Este método usa intervalos de 60 segundos como taxa de geração de relatórios e definição de pontos de verificação. Para obter mais informações sobre pontos de verificação, consulte [Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats classe**  
Essa classe fornece retenção de dados e rastreamento de estatísticas em relação às ações mais populares ao longo do tempo. Esse código é fornecido e contém os seguintes métodos:  
+ `addStockTrade(StockTrade)`: injeta o `StockTrade` conhecido nas estatísticas correntes.
+ `toString()`: retorna as estatísticas em uma string formatada.
Essa classe rastreia as ações mais populares mantendo uma contagem corrente do número total de negociações de cada ação e a contagem máxima. Ela atualiza essas contagens sempre que chega uma negociação de ação.

Adicione código aos métodos da classe `StockTradeRecordProcessor`, como mostrado nas etapas a seguir. 

**Como implementar o consumidor**

1. Implemente o método `processRecord` instanciando um objeto `StockTrade` de tamanho correto e adicionando a ele os dados do registro, registrando um aviso caso ocorra problema. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implemente um método `reportStats`. Modifique o formato de saída para se adequar às suas preferências. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implemente o método `resetStats`, que cria uma nova instância de `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implemente os seguintes métodos exigidos pela interface `ShardRecordProcessor`:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Como executar o consumidor**

1. Execute a aplicação de produção escrita em [[Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl2-producer.md) para injetar registros de negociações de ações no fluxo.

1. Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo `~/.aws/credentials`. 

1. Execute a classe `StockTradesProcessor` com os seguintes argumentos:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Observe que, ao criar o fluxo em uma região diferente de `us-west-2`, é necessário especificar essa região aqui.

Depois de um minuto, deverá aparecer uma saída como a seguir, atualizada a cada minuto a partir de então:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Opcional) Estender o consumidor](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Opcional) Estender o consumidor
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

Esta seção opcional mostra como estender o código de consumidor para um cenário um pouco mais elaborado.

Para saber mais sobre os maiores pedidos de venda a cada minuto, pode-se modificar a classe `StockStats` em três locais para acomodar essa nova prioridade.

**Para estender o consumidor**

1. Adicione novas variáveis de instância:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Adicione o seguinte código a `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modifique o método `toString` para imprimir as informações adicionais:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Se o consumidor for executado agora (lembre-se de executar o produtor também), deverá aparecer uma saída semelhante a esta:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Próximas etapas
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Limpar os recursos](tutorial-stock-data-kplkcl2-finish.md)

# Limpar os recursos
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Como a utilização do fluxo de dados do Kinesis é paga, certifique-se de excluí-la e de excluir a tabela do Amazon DynamoDB correspondente ao concluir. As cobranças nominais ocorrerão em um fluxo ativo mesmo quando não houver envio e recebimento de registros. Isso ocorre porque um fluxo ativo usa recursos por meio da "escuta" contínua de registros recebidos e solicitações para obter registros.

**Para excluir o fluxo e tabela**

1. Desligue os produtores e consumidores que possam estar em execução.

1. [Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Escolha o fluxo criado para esta aplicação (`StockTradeStream`).

1. Escolha **Excluir fluxo**.

1. Abra o console do DynamoDB em. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Exclua a tabela `StockTradesProcessor`.

## Resumo
<a name="tutorial-stock-data-kplkcl2-summary"></a>

Para processar uma grande quantidade de dados quase em tempo real, não é preciso escrever nenhum código complicado nem desenvolver uma imensa infraestrutura. Isso é tão básico quanto escrever lógica para processar uma pequena quantidade de dados (como escrever `processRecord(Record)`), mas usando o Kinesis Data Streams para escalar e ter um processo que funciona para uma grande quantidade de dados de fluxo. Não há necessidade de se preocupar com a escalabilidade do processamento, porque o Kinesis Data Streams cuida de tudo. Basta enviar os registros de fluxo ao Kinesis Data Streams e escrever a lógica para processar cada novo registro recebido. 

Veja aqui alguns aprimoramentos potenciais para este aplicativo.

**Agregar em todos os fragmentos**  
Atualmente, obtém-se estatísticas resultantes da agregação de registros de dados recebidos por um único operador proveniente de um único fragmento. (Um fragmento não pode ser processado por mais de um operador em um aplicativo ao mesmo tempo). Naturalmente, ao escalar havendo mais de um fragmento, pode ser desejável agregar em todos os fragmentos. É possível fazer isso tendo uma arquitetura de pipeline em que a saída de cada operador é alimentada em outro fluxo com um único fragmento, o qual é processado por um operador que agrega as saídas do primeiro estágio. Como os dados do primeiro estágio são limitados (um exemplo por minuto por fragmento), eles podem ser facilmente tratados por um fragmento.

**Escalar o processamento**  
Quando o fluxo é expandido para ter muitos fragmentos (porque muitos produtores estão enviando dados), a maneira de escalar o processamento é adicionando mais operadores. É possível executar os operadores em instâncias do Amazon EC2 e usar grupos do Auto Scaling.

**Use conectores para o Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Como um fluxo é processado continuamente, sua saída pode ser enviada para outros destinos. AWS fornece [conectores](https://github.com/awslabs/amazon-kinesis-connectors) para integrar o Kinesis Data Streams com AWS outros serviços e ferramentas de terceiros.

# Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x
<a name="tutorial-stock-data-kplkcl"></a>

O cenário deste tutorial envolve consumir negociações do mercado de ações em um fluxo de dados e criar uma aplicação simples do Amazon Kinesis Data Streams para realizar cálculos no fluxo. Será explicado como enviar um fluxo de registros para o Kinesis Data Streams e implementar uma aplicação que consome e processa os registros em tempo quase real.

**Importante**  
Depois de criar um stream, sua conta incorre em cobranças nominais pelo uso do Kinesis Data Streams porque o Kinesis Data Streams não está qualificado para o nível gratuito. AWS Depois de iniciada, a aplicação de consumo também incorre em cobranças nominais pelo uso do Amazon DynamoDB. A aplicação de consumo usa o DynamoDB para monitorar o estado do processamento. Ao terminar de usar esta aplicação, exclua seus recursos da AWS para parar de gerar cobranças. Para obter mais informações, consulte [Limpar os recursos](tutorial-stock-data-kplkcl-finish.md).

O código não acessa os dados reais da bolsa de valores, ele simula o fluxo de negociações de ações. Isso é feito com o uso de um gerador de negociações de ações aleatórias cujo ponto de partida são dados do mercado real referente às 25 principais ações por capitalização de mercado em fevereiro de 2015. Se houver acesso a um fluxo de negociações de ações em tempo real, pode ser interessante derivar estatísticas úteis e em tempo hábil desse fluxo. Por exemplo, talvez convenha executar uma análise de janela deslizante na qual se determine a ação mais popular que foi adquirida nos últimos 5 minutos. Ou talvez convenha uma notificação sempre que uma ordem de venda for muito grande (ou seja, tenha muitas quotas). É possível estender o código nesta série para oferecer essa funcionalidade.

É possível executar as etapas deste tutorial no desktop ou laptop. O código de produtor e o de consumidor podem ser executados na mesma máquina ou em qualquer plataforma que ofereça suporte aos requisitos definidos, como o Amazon Elastic Compute Cloud (Amazon EC2).

Os exemplos mostrados usam a região Oeste dos EUA (Oregon), mas funcionam em qualquer [região da AWS que oferece suporte ao Kinesis Data Streams](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Concluir os pré-requisitos
](tutorial-stock-data-kplkcl-begin.md)
+ [

# Criar um fluxo de dados
](tutorial-stock-data-kplkcl-create-stream.md)
+ [

# Criar um usuário e uma política do IAM
](tutorial-stock-data-kplkcl-iam.md)
+ [

# Fazer download e compilação do código de implementação
](tutorial-stock-data-kplkcl-download.md)
+ [

# Implementar o produtor
](tutorial-stock-data-kplkcl-producer.md)
+ [

# Implementar o consumidor
](tutorial-stock-data-kplkcl-consumer.md)
+ [

# (Opcional) Estender o consumidor
](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [

# Limpar os recursos
](tutorial-stock-data-kplkcl-finish.md)

# Concluir os pré-requisitos
<a name="tutorial-stock-data-kplkcl-begin"></a>

Estes são os requisitos para concluir o [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x[Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md).

## Criar e usar uma conta da Amazon Web Services
<a name="tutorial-stock-data-kplkcl-begin-aws"></a>

Antes de começar, familiarize-se com os conceitos abordados em [Terminologia e conceitos do Amazon Kinesis Data Streams](key-concepts.md), especialmente fluxos, fragmentos, produtores e consumidores. Também é útil ter concluído [Tutorial: Instalar e configurar o AWS CLI para Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Você precisa de uma AWS conta e um navegador da web para acessar Console de gerenciamento da AWS o.

Para acessar o console, use seu nome de usuário e senha do IAM para fazer login no [Console de gerenciamento da AWS](https://console.aws.amazon.com/console/home) da página de login do IAM. *Para obter informações sobre credenciais AWS de segurança, incluindo acesso programático e alternativas às credenciais de longo prazo, consulte as credenciais de [AWS segurança no Guia do usuário](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) do IAM.* Para obter detalhes sobre como fazer login no seu Conta da AWS, consulte [Como fazer login AWS no](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) *Guia do Início de Sessão da AWS usuário*.

Para obter mais informações sobre o IAM e instruções de configuração da chave de segurança, consulte [Criar um usuário do IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Preencher os requisitos de software do sistema
<a name="tutorial-stock-data-kplkcl-begin-sys"></a>

O sistema usado para executar o aplicativo precisa ter o Java 7 ou superior instalado. Para fazer download e instalar o Java Development Kit (JDK) mais recente, acesse o [Site de instalação do Java SE da Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

Com um Java IDE, como o [Eclipse](https://www.eclipse.org/downloads/),é possível abrir o código-fonte, editá-lo, compilá-lo e executá-lo.

É necessário ter a versão mais recente do [AWS SDK para Java](https://aws.amazon.com/sdk-for-java/). Usando o Eclipse como IDE, é possível instalar o [kit de ferramentas da AWS para Eclipse](https://aws.amazon.com/eclipse/) em vez desse SDK. 

O aplicativo consumidor requer a Kinesis Client Library (KCL) versão 1.2.1 ou superior, que você pode obter na GitHub [Kinesis](https://github.com/awslabs/amazon-kinesis-client) Client Library (Java).

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-begin-next"></a>

[Criar um fluxo de dados](tutorial-stock-data-kplkcl-create-stream.md)

# Criar um fluxo de dados
<a name="tutorial-stock-data-kplkcl-create-stream"></a>

Na primeira etapa do [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x[Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md), crie o fluxo que será usado em etapas subsequentes.

**Para criar um fluxo**

1. [Faça login no Console de gerenciamento da AWS e abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Selecione **Fluxos de dados** no painel de navegação.

1. Na barra de navegação, expanda o seletor de região e escolha uma região.

1. Selecione **Criar fluxo do Kinesis**.

1. Insira um nome para seu fluxo (por exemplo, **StockTradeStream**).

1. Digite **1** como o número de fragmentos, mas deixe **Estimar o número de fragmentos necessários** recolhido.

1. Selecione **Criar fluxo do Kinesis**.

Na página de lista **Fluxos do Kinesis**, o status do fluxo é `CREATING` enquanto ele está sendo criado. Quando o fluxo fica pronto para uso, o status é alterado para `ACTIVE`. Escolha o nome do fluxo. Na página exibida, a guia **Detalhes** exibe um resumo da configuração do fluxo. A seção **Monitoramento** exibe informações de monitoramento do fluxo.

## Informações adicionais sobre fragmentos
<a name="tutorial-stock-data-kplkcl-create-stream-info"></a>

Ao começar a usar o Kinesis Data Streams fora deste tutorial, pode ser necessário planejar o processo de criação de fluxos mais cuidadosamente. É necessário planejar para a demanda máxima esperada ao provisionar fragmentos. Usando este cenário como exemplo, o tráfego de negociações da bolsa de valores dos EUA atinge o pico durante o dia (fuso horário do leste dos EUA) e, a partir desse horário, é preciso tirar amostras das estimativas de demanda. Em seguida, pode-se opcionalmente provisionar para a máxima demanda esperada ou expandir e reduzir o fluxo em resposta às variações de demanda. 

Um *fragmento* é uma unidade de capacidade de throughput. Na página **Criar fluxo do Kinesis**, expanda **Estime o número de fragmentos necessários**. Digite o tamanho médio do registro, o máximo de registros gravados por segundo e o número de aplicativos de consumo usando as seguintes diretrizes:

**Tamanho médio do registro**  
Uma estimativa do tamanho médio calculado dos registros. Se esse valor não for conhecido, use o tamanho de registro máximo estimado.

**Máximo de registros gravados**  
Considere o número de entidades que fornecem dados e o número aproximado de registros por segundo produzidos por cada um. Por exemplo, ao receber dados de negociações de ações provenientes de 20 servidores mercantis, com cada um gerando 250 negociações por segundo, o número total de transações (registros) por segundo é 5.000 por segundo. 

**Número de aplicativos de consumo**  
Número de aplicativos que fazem leitura do fluxo de forma independente para processar o fluxo de outro modo e produzir saída diferente. Cada aplicativo pode ter várias instâncias em execução em máquinas diferentes (ou seja, execução em um cluster) para dar conta de um fluxo de alto volume.

Se o número estimado de fragmentos mostrado exceder o limite atual de fragmentos, poderá ser necessário enviar uma solicitação para aumentar esse limite para poder criar um fluxo com esse número de fragmentos. Para solicitar um aumento do limite de fragmentos, use o [formulário de limites do Kinesis Data Streams](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis). Para obter mais informações sobre fluxos e fragmentos, consulte [Criar e gerenciar fluxos de dados do Kinesis](working-with-streams.md).

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-create-stream-next"></a>

[Criar um usuário e uma política do IAM](tutorial-stock-data-kplkcl-iam.md)

# Criar um usuário e uma política do IAM
<a name="tutorial-stock-data-kplkcl-iam"></a>

Práticas recomendadas de segurança para AWS ditar o uso de permissões refinadas para controlar o acesso a diferentes recursos. AWS Identity and Access Management (IAM) permite gerenciar usuários e permissões de usuário no AWS. Uma [Política do IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) lista explicitamente as ações permitidas e os recursos aos quais as ações são aplicáveis.

Veja a seguir as permissões mínimas normalmente necessárias para um produtor e um consumidor do Kinesis Data Streams.


**Produtor**  

| Ações | Recurso | Finalidade | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Fluxo de dados do Kinesis | Antes de tentar gravar registros, a aplicação de produção verifica se o fluxo existe e está ativo, se os fragmentos estão contidos no fluxo e se o fluxo tem um consumidor. | 
| SubscribeToShard, RegisterStreamConsumer | Fluxo de dados do Kinesis | Faz a inscrição e registra um consumidor em um fragmento de fluxo de dados do Kinesis. | 
| PutRecord, PutRecords | Fluxo de dados do Kinesis | Gravar registros no Kinesis Data Streams. | 


**Consumidor**  

| **Ações** | **Recurso** | **Finalidade** | 
| --- | --- | --- | 
| DescribeStream | Fluxo de dados do Kinesis | Antes de tentar ler registros, a aplicação de consumo verifica se o fluxo existe e está ativo e se os fragmentos estão contidos no fluxo. | 
| GetRecords, GetShardIterator  | Fluxo de dados do Kinesis | Ler registros em um fragmento do Kinesis Data Streams. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabela do Amazon DynamoDB | Se for desenvolvido usando a Kinesis Client Library (KCL), o consumidor precisará de permissões para uma tabela do DynamoDB a fim de monitorar o estado de processamento da aplicação. O primeiro consumidor iniciado cria a tabela.  | 
| DeleteItem | Tabela do Amazon DynamoDB | Para quando o consumidor realiza split/merge operações nos fragmentos do Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro da Amazon | O KCL também carrega métricas para CloudWatch, que são úteis para monitorar o aplicativo. | 

Para essa aplicação, crie uma única política do IAM que conceda todas as permissões anteriores. Na prática, talvez convenha considerar a criação de duas políticas, uma para produtores e uma para consumidores.

**Para criar uma política do IAM**

1. Localize o Nome de recurso da Amazon (ARN) para o novo fluxo. Esse ARN pdoe ser encontrado listado como **ARN do fluxo** na parte superior da guia **Detalhes**. O formato do ARN é o seguinte:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
Código da região; por exemplo, `us-west-2`. Para obter mais informações, consulte [Conceitos de região e zona de disponibilidade](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
O ID da AWS conta, conforme mostrado nas [Configurações da conta](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
Nome do fluxo de [Criar um fluxo de dados](tutorial-stock-data-kplkcl-create-stream.md), que é `StockTradeStream`.

1. Determine o ARN da tabela do DynamoDB a ser usada pelo consumidor (e criada pela primeira instância de consumidor). Ele deve estar no seguinte formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   A região e a conta são do mesmo local que a etapa anterior, mas desta vez *name* é o nome da tabela criada e usada pelo aplicativo de consumidor. A KCL usada pelo consumidor usa o nome do aplicativo como o nome da tabela. Use o nome do aplicativo que será usado mais tarde, `StockTradesProcessor`.

1. No console do IAM, em **Políticas** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), escolha **Create policy**. Se este for o primeiro contato com políticas do IAM, escolha **Conceitos básicos** e **Criar política**.

1. Escolha **Selecionar** ao lado de **Gerador de políticas**.

1. Escolha o **Amazon Kinesis** como serviço. AWS 

1. Selecione `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord`e `PutRecords` como ações permitidas.

1. Digite o ARN criado na Etapa 1.

1. Use **Adicionar instrução** para cada um dos seguintes:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   O asterisco (`*`) é usado quando não é necessário especificar um ARN. Nesse caso, é porque não há nenhum recurso específico CloudWatch no qual a `PutMetricData` ação seja invocada.

1. Escolha **Próxima etapa**.

1. Altere **Nome da política** para `StockTradeStreamPolicy`, revise o código e selecione **Criar política**.

O documento de política resultante deve ser algo como o exemplo a seguir:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Para criar um usuário do IAM**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Na página **Usuários**, selecione **Adicionar usuário**.

1. Em **Nome de usuário**, digite `StockTradeStreamUser`.

1. Em **Tipo de acesso**, selecione **Aceso programático** e, em seguida, selecione **Próximo: permissões**.

1. Escolha **Anexar políticas existentes diretamente**.

1. Pesquise a política criada por nome. Selecione a caixa à esquerda do nome da política e selecione **Próximo: revisão**.

1. Revise os detalhes e o resumo e, em seguida, selecione **Criar usuário**.

1. Copie o **ID da chave de acesso** e salve-o de forma privada. Em **Chave de acesso secreta**, selecione **Mostrar** e salve a chave de forma privada também.

1. Cole as chaves de acesso e chaves secretas em um arquivo local em lugar seguro de acesso restrito. Para esse aplicativo, crie um arquivo denominado ` ~/.aws/credentials` (com permissões restritas). O arquivo deverá estar no seguinte formato:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Para anexar uma política do IAM a um usuário**

1. No console do IAM, abra [Políticas](https://console.aws.amazon.com/iam/home?#policies) e selecione **Ações da política**. 

1. Selecione `StockTradeStreamPolicy` e **Anexar**.

1. Selecione `StockTradeStreamUser` e **Anexar política**.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-iam-next"></a>

[Fazer download e compilação do código de implementação](tutorial-stock-data-kplkcl-download.md)

# Fazer download e compilação do código de implementação
<a name="tutorial-stock-data-kplkcl-download"></a>

O código esqueleto é fornecido para o [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md). Ele contém uma implementação de stub para o consumo do fluxo de negociações de ações (*produtor*) e para o processamento dos dados (*consumidor*). O procedimento a seguir mostra como concluir a implementação. 

**Para fazer download e compilação do código de implementação**

1. Faça download do [código-fonte](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1) no computador.

1. Crie um projeto no IDE favorito com o código-fonte, respeitando a estrutura de diretório fornecida.

1. Adicione as seguintes bibliotecas ao projeto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Dependendo do seu IDE, o projeto pode ser compilado automaticamente. Se não, compile o projeto usando as etapas apropriadas para o seu IDE.

Se essas etapas foram concluídas com êxito, é possível agora ir para a próxima seção, [Implementar o produtor](tutorial-stock-data-kplkcl-producer.md). Se a compilação gerar erros em qualquer estágio, investigue e os corrija antes de continuar.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-download-next"></a>

[[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)

# Implementar o produtor
<a name="tutorial-stock-data-kplkcl-producer"></a>



O aplicativo no [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x[Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) usa o cenário real de monitoramento de negociações em bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de apoio.

Consulte o código-fonte e analise as informações a seguir.

**StockTrade classe**  
Uma negociação de ação individual é representada por uma instância da classe `StockTrade`. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é previamente implementada.

**Registro de fluxo**  
Um fluxo é uma sequência de registros. Um registro é uma serialização de uma instância `StockTrade` no formato JSON. Por exemplo:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
`StockTradeGenerator` tem um método denominado `getRandomTrade()`, que retorna uma nova negociação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é previamente implementada.

**StockTradesWriter classe**  
O método `main` do produtor, `StockTradesWriter`, recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:  

1. Lê o nome do fluxo e o nome da região como entrada.

1. Cria um `AmazonKinesisClientBuilder`.

1. Usa o criador do cliente para definir região, credenciais e configuração do cliente.

1. Cria um cliente `AmazonKinesis` usando o criador do cliente.

1. Verifica se o stream existe e está ativo (se não, ele será encerrado com um erro).

1. Em um loop contínuo, chama o método `StockTradeGenerator.getRandomTrade()` e o método `sendStockTrade` para enviar a negociação ao stream a cada 100 milissegundos.
O método `sendStockTrade` da classe `StockTradesWriter` tem o seguinte código:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Consulte o desmembramento do código a seguir:
+ A API de `PutRecord` espera uma matriz de bytes, e é necessário converter `trade` para o formato JSON. Essa única linha de código executa a seguinte operação:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Antes de enviar a negociação, crie uma nova instância de `PutRecordRequest` (denominada `putRecord` neste caso):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Cada chamada a `PutRecord` requer o nome do fluxo, uma chave de partição e um blob de dados. O código a seguir preenche esses campos no objeto `putRecord` usando seus métodos `setXxxx()`:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado fragmento. Na prática, deve haver centenas ou milhares de chaves de partição por fragmento, de forma que os registros sejam uniformemente disseminados no fluxo. Para obter mais informações sobre como adicionar dados a um fluxo, consulte [Adicionar dados a um fluxo](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  Agora `putRecord` está pronto para enviar para o cliente (operação `put`):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Adicione o try/catch bloco ao redor da `put` operação:

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  Isso ocorre porque uma operação `put` do Kinesis Data Streams pode falhar devido a um erro de rede ou porque o fluxo de dados atinge o limite de throughput e tem sua utilização controlada. Recomendamos considerar cuidadosamente sua política de tentativa para operações `put` a fim de evitar perda de dados, usando como uma nova tentativa. 
+ O registro de status é útil mas opcional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams, `PutRecord`. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros de `PutRecords` e enviar lotes de registros por vez. Para obter mais informações, consulte [Adicionar dados a um fluxo](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**Como executar o produtor**

1. Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo `~/.aws/credentials`. 

1. Execute a classe `StockTradeWriter` com os seguintes argumentos:

   ```
   StockTradeStream us-west-2
   ```

   Se o fluxo foi criado em uma região diferente de `us-west-2`, é necessário especificar essa região aqui.

A saída deve ser semelhante a:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Seu fluxo de negociações de ações agora está sendo ingerido pelo Kinesis Data Streams.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[Implementar o consumidor](tutorial-stock-data-kplkcl-consumer.md)

# Implementar o consumidor
<a name="tutorial-stock-data-kplkcl-consumer"></a>

A aplicação de consumo no [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x[Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) processa continuamente o fluxo de negociações de ações criado em [[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md). Em seguida, ele produz as ações mais populares compradas e vendidas a cada minuto. A aplicação é compilada com base na Kinesis Client Library (KCL), que faz grande parte do trabalho pesado comum às aplicações de consumo. Para obter mais informações, consulte [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md). 

Consulte o código-fonte e analise as informações a seguir.

**StockTradesProcessor classe**  
Principal classe do consumidor fornecida e que executa as seguintes tarefas:  
+ Lê o aplicativo, o fluxo e os nomes de região passados como argumentos.
+ Lê credenciais de `~/.aws/credentials`.
+ Cria uma instância de `RecordProcessorFactory` que veicula instâncias de `RecordProcessor`, implementadas por uma instância de `StockTradeRecordProcessor`.
+ Cria um operador da KCL com a instância `RecordProcessorFactory` e uma configuração padrão que inclui o nome do fluxo, as credenciais e o nome da aplicação. 
+ O operador cria um novo thread para cada fragmento (atribuído a essa instância de consumidor), que opera em loops contínuos para ler registros do Kinesis Data Streams. Em seguida, ele invoca a instância de `RecordProcessor` para processar cada lote de registros recebidos.

**StockTradeRecordProcessor classe**  
Implementação da instância de `RecordProcessor`, que, por sua vez, implementa três métodos necessários: `initialize`, `processRecords` e `shutdown`.  
Como os nomes sugerem, `initialize` e `shutdown` são usados pela Kinesis Client Library para permitir que o processador de registros saiba quando deve estar pronto para começar a receber registros e quando deve esperar parar de receber registros, respectivamente, para poder realizar tarefas de configuração e encerramento específicas da aplicação. Este código é fornecido para você. O processamento principal ocorre no método `processRecords`, que, por sua vez, usa `processRecord` para cada registro. Esse último método é fornecido como um código esqueleto quase todo vazio, para implementação na próxima etapa, onde é melhor explicado.  
Observe também a implementação dos métodos de suporte de `processRecord`: `reportStats` e `resetStats`, que estão vazios no código-fonte original.  
O método `processRecords`, implementado previamente, executa as seguintes etapas:  
+  Para cada registro passado, chama `processRecord`.
+ Se pelo menos 1 minuto houver decorrido após o último relatório, chamará `reportStats()`, que imprime as estatísticas mais recentes e, em seguida, `resetStats()`, que limpa as estatísticas para que o próximo intervalo inclua apenas registros novos.
+ Define o próximo horário para geração de relatórios.
+ Se houver decorrido pelo menos 1 minuto após o último ponto de verificação, chamará `checkpoint()`. 
+ Define o próximo horário do ponto de verificação.
Este método usa intervalos de 60 segundos como taxa de geração de relatórios e definição de pontos de verificação. Para obter mais informações sobre definição de pontos de verificação, consulte [Informações adicionais sobre o consumidor](#tutorial-stock-data-kplkcl-consumer-supplement).

**StockStats classe**  
Essa classe fornece retenção de dados e rastreamento de estatísticas em relação às ações mais populares ao longo do tempo. Esse código é fornecido e contém os seguintes métodos:  
+ `addStockTrade(StockTrade)`: injeta o `StockTrade` conhecido nas estatísticas correntes.
+ `toString()`: retorna as estatísticas em uma string formatada.
Essa classe rastreia as ações mais populares mantendo uma contagem corrente do número total de negociações de cada ação e a contagem máxima. Ela atualiza essas contagens sempre que chega uma negociação de ação.

Adicione código aos métodos da classe `StockTradeRecordProcessor`, como mostrado nas etapas a seguir.

**Como implementar o consumidor**

1. Implemente o método `processRecord` instanciando um objeto `StockTrade` de tamanho correto e adicionando a ele os dados do registro, registrando um aviso caso ocorra problema.

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. Implemente um método `reportStats` simples. Sinta-se à vontade para modificar o formato de saída conforme suas preferências.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. Finalmente, implemente o método `resetStats`, que cria uma nova instância de `stockStats`.

   ```
   stockStats = new StockStats();
   ```

**Como executar o consumidor**

1. Execute a aplicação de produção escrita em [[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md) para injetar registros de negociações de ações no fluxo.

1. Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo `~/.aws/credentials`. 

1. Execute a classe `StockTradesProcessor` com os seguintes argumentos:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Observe que, ao criar o fluxo em uma região diferente de `us-west-2`, é necessário especificar essa região aqui.

Depois de um minuto, deverá aparecer uma saída como a seguir, atualizada a cada minuto a partir de então:

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Informações adicionais sobre o consumidor
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

Se já houver familiaridade com as vantagens da Kinesis Client Library, abordada em [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md) e em outros documentos, poderá haver algum quesitonamento sobre usá-la aqui. Mesmo usando apenas um fluxo de fragmento e uma instância de consumidor para processá-lo, é mais fácil implementar o consumidor usando a KCL. Compare as etapas de implementação do código na seção do produtor para o consumidor para ver a facilidade comparativa para implementar um consumidor. Isso se deve, em grande parte, aos serviços que a KCL fornece.

Nessa aplicação, cencentre-se na implementação de uma classe de processador de registros, capaz de processar registros individuais. Não é necessário se preocupar com a forma como os registros são obtidos do Kinesis Data Streams. A KCL obtém os registros e invoca o processador de registros sempre que há novos registros disponíveis. Além disso, não é necessário se preocupar com a quantidade de fragmentos e de instâncias de consumidor. Se o fluxo for escalonado, não é necessário reescrever o aplicativo para lidar com mais de um fragmento ou com uma instância de uma aplicação de consumo.

O termo *ponto de verificação* significa registrar o ponto no fluxo até os registros de dados que foram consumidos e processados até o momento. Se o aplicativo falhar, o fluxo será lido a partir desse ponto e não do início do fluxo. O assunto da definição de pontos de verificação e os vários padrões de design e melhores práticas relativos estão fora do escopo deste capítulo. No entanto, é algo que pode ser encontrado em ambientes de produção.

Conforme visto no [[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md), as operações `put` na API do Kinesis Data Streams usam uma *chave de partição* como entrada. O Kinesis Data Streams usa uma chave de partição como um mecanismo para dividir registros em vários fragmentos (quando há mais de um fragmento no fluxo). A mesma chave de partição sempre roteia para o mesmo fragmento. Isso permite que o consumidor que processa um determinado fragmento seja projetado com a premissa de que os registros com a mesma chave de partição só sejam enviados a esse consumidor, e nenhum registro com a mesma chave de partição termine em qualquer outro consumidor. Portanto, o operador de um consumidor pode agregar todos os registros com a mesma chave de partição sem se preocupar com a ausência de dados necessários.

Nesta aplicação, como o processamento de registros do consumidor não é intensivo, é possível usar um fragmento e fazer o processamento no mesmo thread da KCL. No entanto, na prática, considere primeiro escalar o número de fragmentos. Em alguns casos, talvez convenha mudar o processamento para outro thread ou usar um grupo de threads se for esperado que o processamento de registros seja intensivo. Dessa forma, a KCL pode obter novos registros mais rapidamente, enquanto outros threads podem processar os registros em paralelo. O design multithread não é trivial e deve ser planejado com técnicas avançadas, portanto, aumentar a contagem de fragmentos costuma ser a maneira mais eficiente de escalar.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[(Opcional) Estender o consumidor](tutorial-stock-data-kplkcl-consumer-extension.md)

# (Opcional) Estender o consumidor
<a name="tutorial-stock-data-kplkcl-consumer-extension"></a>

O aplicativo no [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x[Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) já pode ser suficiente para os seus propósitos. Esta seção opcional mostra como estender o código de consumidor para um cenário um pouco mais elaborado.

Para saber mais sobre os maiores pedidos de venda a cada minuto, pode-se modificar a classe `StockStats` em três locais para acomodar essa nova prioridade.

**Para estender o consumidor**

1. Adicione novas variáveis de instância:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Adicione o seguinte código a `addStockTrade`:

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modifique o método `toString` para imprimir as informações adicionais:

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

Se o consumidor for executado agora (lembre-se de executar o produtor também), deverá aparecer uma saída semelhante a esta:

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-consumer-extension-next"></a>

[Limpar os recursos](tutorial-stock-data-kplkcl-finish.md)

# Limpar os recursos
<a name="tutorial-stock-data-kplkcl-finish"></a>

Como a utilização do fluxo de dados do Kinesis é paga, certifique-se de excluí-la e de excluir a tabela do Amazon DynamoDB correspondente ao concluir. As cobranças nominais ocorrerão em um fluxo ativo mesmo quando não houver envio e recebimento de registros. Isso ocorre porque um fluxo ativo usa recursos por meio da "escuta" contínua de registros recebidos e solicitações para obter registros.

**Para excluir o fluxo e tabela**

1. Desligue os produtores e consumidores que possam estar em execução.

1. [Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Escolha o fluxo criado para esta aplicação (`StockTradeStream`).

1. Escolha **Excluir fluxo**.

1. Abra o console do DynamoDB em. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Exclua a tabela `StockTradesProcessor`.

## Resumo
<a name="tutorial-stock-data-kplkcl-summary"></a>

Para processar uma grande quantidade de dados quase em tempo real, não é preciso escrever nenhum código complicado nem desenvolver uma imensa infraestrutura. Isso é tão básico quanto escrever lógica para processar uma pequena quantidade de dados (como escrever `processRecord(Record)`), mas usando o Kinesis Data Streams para escalar e ter um processo que funciona para uma grande quantidade de dados de fluxo. Não há necessidade de se preocupar com a escalabilidade do processamento, porque o Kinesis Data Streams cuida de tudo. Basta enviar os registros de fluxo ao Kinesis Data Streams e escrever a lógica para processar cada novo registro recebido. 

Veja aqui alguns aprimoramentos potenciais para este aplicativo.

**Agregar em todos os fragmentos**  
Atualmente, obtém-se estatísticas resultantes da agregação de registros de dados recebidos por um único operador proveniente de um único fragmento. (Um fragmento não pode ser processado por mais de um operador em um aplicativo ao mesmo tempo). Naturalmente, ao escalar havendo mais de um fragmento, pode ser desejável agregar em todos os fragmentos. É possível fazer isso tendo uma arquitetura de pipeline em que a saída de cada operador é alimentada em outro fluxo com um único fragmento, o qual é processado por um operador que agrega as saídas do primeiro estágio. Como os dados do primeiro estágio são limitados (um exemplo por minuto por fragmento), eles podem ser facilmente tratados por um fragmento.

**Escalar o processamento**  
Quando o fluxo é expandido para ter muitos fragmentos (porque muitos produtores estão enviando dados), a maneira de escalar o processamento é adicionando mais operadores. É possível executar os operadores em instâncias do Amazon EC2 e usar grupos do Auto Scaling.

**Use conectores para o Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Como um fluxo é processado continuamente, sua saída pode ser enviada para outros destinos. AWS fornece [conectores](https://github.com/awslabs/amazon-kinesis-connectors) para integrar o Kinesis Data Streams com AWS outros serviços e ferramentas de terceiros.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-next-steps"></a>
+ Para obter mais informações sobre o uso das operações de API do Kinesis Data Streams, consulte [Desenvolva produtores usando a API Amazon Kinesis Data Streams com o AWS SDK para Java](developing-producers-with-sdk.md), [Desenvolva consumidores com produtividade compartilhada com o AWS SDK para Java](developing-consumers-with-sdk.md) e [Criar e gerenciar fluxos de dados do Kinesis](working-with-streams.md).
+ Para obter mais informações sobre a Kinesis Client Library, consulte [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md). 
+ Para obter mais informações sobre como otimizar seu aplicativo, consulte [Otimizar consumidores do Amazon Kinesis Data StreamsOtimizar consumidores do Kinesis Data Streams](advanced-consumers.md). 

# Tutorial: analise dados do mercado de ações em tempo real usando o Amazon Managed Service for Apache Flink
<a name="tutorial-stock-data"></a>

O cenário deste tutorial envolve consumir negociações do mercado de ações em um fluxo de dados e criar uma aplicação simples do [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html) para realizar cálculos no fluxo. Será explicado como enviar um fluxo de registros para o Kinesis Data Streams e implementar uma aplicação que consome e processa os registros em tempo quase real.

Com o Amazon Managed Service for Apache Flink, você pode usar Java, Scala ou SQL para processar e analisar dados do fluxo. O serviço permite que você crie e execute código Java ou Scala em comparação com origens de fluxos para fazer analytics de séries temporais, alimentar painéis e criar métricas em tempo real.

É possível criar aplicações Flink no Managed Service for Apache Flink usando bibliotecas de código aberto baseadas no [Apache Flink](https://flink.apache.org/). O Apache Flink é uma estrutura popular e um mecanismo para o processamento de fluxos de dados. 

**Importante**  
Depois de criar dois fluxos de dados e um aplicativo, sua conta incorre em cobranças nominais pelo Kinesis Data Streams e pelo Managed Service for Apache Flink porque eles não estão qualificados para o nível gratuito. AWS Quando você terminar de usar esse aplicativo, exclua seus AWS recursos para parar de incorrer em cobranças. 

O código não acessa os dados reais da bolsa de valores, ele simula o fluxo de negociações de ações. Isso é feito com o uso de um gerador de negociações de ações aleatórias. Se houver acesso a um fluxo de negociações de ações em tempo real, pode ser interessante derivar estatísticas úteis e em tempo hábil desse fluxo. Por exemplo, talvez convenha executar uma análise de janela deslizante na qual se determine a ação mais popular que foi adquirida nos últimos 5 minutos. Ou talvez convenha uma notificação sempre que uma ordem de venda for muito grande (ou seja, tenha muitas quotas). É possível estender o código nesta série para oferecer essa funcionalidade.

Os exemplos mostrados usam a região Oeste dos EUA (Oregon), mas funcionam em qualquer [região da AWS que oferece suporte ao Managed Service for Apache Flink](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region).

**Topics**
+ [

## Pré-requisitos para concluir os exercícios
](#setting-up-prerequisites)
+ [

# Configurar uma AWS conta e criar um usuário administrador
](setting-up.md)
+ [

# Configure o AWS Command Line Interface (AWS CLI)
](setup-awscli.md)
+ [

# Criar e executar uma aplicação do Managed Service for Apache Flink
](get-started-exercise.md)

## Pré-requisitos para concluir os exercícios
<a name="setting-up-prerequisites"></a>

Para concluir as etapas neste guia, é necessário ter o seguinte:
+ [Java Development Kit](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) (JDK) versão 8. Defina a variável do ambiente`JAVA_HOME` para apontar para o local de instalação do JDK.
+ Recomenda-se o uso de um ambiente de desenvolvimento (como [Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) ou [IntelliJ Idea](https://www.jetbrains.com/idea/)) para desenvolver e compilar seu aplicativo.
+ [Cliente do Git.](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Instale o cliente do Git, se isso ainda não foi feito.
+ [Apache Maven Compiler Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/). Maven deve estar em seu caminho de trabalho. Para testar a instalação do Apache Maven, insira o seguinte:

  ```
  $ mvn -version
  ```

Para começar a usar, acesse [Configurar uma AWS conta e criar um usuário administrador](setting-up.md).

# Configurar uma AWS conta e criar um usuário administrador
<a name="setting-up"></a>

Antes de usar o Amazon Managed Service for Apache Flink pela primeira vez, conclua as seguintes tarefas:

1. [Inscreva-se para AWS](#setting-up-signup)

1. [Criar um usuário do IAM](#setting-up-iam)

## Inscreva-se para AWS
<a name="setting-up-signup"></a>

Quando você se inscreve no Amazon Web Services (AWS), sua AWS conta é automaticamente cadastrada em todos os serviços AWS, incluindo o Amazon Managed Service para Apache Flink. A cobrança incorrerá apenas pelos serviços utilizados.

Com o Managed Service for Apache Flink, pague apenas pelos recursos usados. Novos clientes da AWS podem começar a usar o Managed Service for Apache Flink gratuitamente. Para obter mais informações, consulte [Nível gratuito da AWS](https://aws.amazon.com/free/).

Se você já tiver uma AWS conta, vá para a próxima tarefa. Se não tiver uma conta da AWS , siga as etapas a seguir para criar uma.

**Para criar uma AWS conta**

1. Abra a [https://portal.aws.amazon.com/billing/inscrição.](https://portal.aws.amazon.com/billing/signup)

1. Siga as instruções online.

   Parte do procedimento de inscrição envolve receber uma chamada telefônica ou uma mensagem de texto e inserir um código de verificação pelo teclado do telefone.

   Quando você se inscreve em um Conta da AWS, um *Usuário raiz da conta da AWS*é criado. O usuário-raiz tem acesso a todos os Serviços da AWS e recursos na conta. Como prática recomendada de segurança, atribua o acesso administrativo a um usuário e use somente o usuário-raiz para executar [tarefas que exigem acesso de usuário-raiz](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks).

Anote o ID da sua AWS conta porque você precisará dele para a próxima tarefa.

## Criar um usuário do IAM
<a name="setting-up-iam"></a>

Serviços em AWS, como o Amazon Managed Service para Apache Flink, exigem que você forneça credenciais ao acessá-los. Dessa maneira, o serviço pode determinar se há permissões para acessar os recursos próprios desse serviço. O Console de gerenciamento da AWS exige que você digite sua senha. 

Você pode criar chaves de acesso para sua AWS conta para acessar o AWS Command Line Interface (AWS CLI) ou a API. No entanto, não recomendamos que você acesse AWS usando as credenciais da sua AWS conta. Em vez disso, recomendamos que você use AWS Identity and Access Management (IAM). Crie um usuário do IAM, adicione o usuário a um grupo do IAM com permissões administrativas e, em seguida, conceda permissões administrativas ao usuário do IAM criado. Em seguida, pode-se acessar a AWS usando uma URL especial e as credenciais desse usuário do IAM.

Se você se inscreveu AWS, mas não criou um usuário do IAM para si mesmo, você pode criar um usando o console do IAM.

Os exercícios de conceitos básicos deste guia pressupõem a existência de um usuário (`adminuser`) com permissões de administrador. Siga o procedimento para criar `adminuser` na conta.

**Para criar um grupo de administradores**

1. Faça login no Console de gerenciamento da AWS e abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. No painel de navegação, escolha **Grupos** e **Criar novo grupo**.

1. Em **Nome do grupo**, digite um nome para o grupo, como **Administrators** e escolha **Próxima etapa**.

1. Na lista de políticas, marque a caixa de seleção ao lado da **AdministratorAccess**política. É possível usar o menu **Filtro** e a caixa **Pesquisar** para filtrar a lista de políticas.

1. Selecione **Próximo passo** e, em seguida, **Criar grupo**.

O grupo novo é listado em **Nome do grupo**.

**Para criar um usuário do IAM para si mesmo, adicioná-lo ao grupo de administradores e criar uma senha**

1. No painel de navegação, escolha **Usuários** e depois **Adicionar usuário**.

1. Na caixa **Nome de usuário**, insira um nome de usuário.

1. Escolha tanto **Acesso programático** como **Acesso ao console de Gerenciamento da AWS **.

1. Escolha **Próximo: Permissões**.

1. Marque a caixa de seleção ao lado do grupo **Administradores**. Então, escolha **Próximo: Análise**.

1. Selecione **Criar usuário**.

**Como fazer login como o novo usuário do IAM**

1. Saia do Console de gerenciamento da AWS.

1. Use o seguinte formato de URL para fazer login no console:

   `https://aws_account_number.signin.aws.amazon.com/console/`

   *aws\$1account\$1number*É o ID da sua AWS conta sem hífens. Por exemplo, se o ID da sua AWS conta for 1234-5678-9012, substitua por. *aws\$1account\$1number* **123456789012** Para obter informações sobre como encontrar o número da sua conta, consulte [Seu ID de AWS conta e seu alias](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html) no *Guia do usuário do IAM*.

1. Insira o nome e a senha de usuário do IAM que você acabou de criar. Quando você está conectado, a barra de navegação exibe *your\$1user\$1name* @*your\$1aws\$1account\$1id*.

**nota**  
Se você não quiser que o URL da sua página de login contenha o ID da sua AWS conta, você pode criar um alias de conta.

**Para criar ou remover um alias de conta**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. No painel de navegação, selecione **Painel**.

1. Encontre o link de login dos usuários do IAM.

1. Para criar o alias, escolha **Personalizar**. Insira o nome que deseja usar para o alias e escolha **Sim, criar**.

1. Para remover o alias, selecione **Personalizar** e, em seguida, selecione **Sim, excluir**. O URL de login é revertido para o uso do ID da sua AWS conta.

Para fazer o login depois de criar o alias de uma conta, use o seguinte URL:

`https://your_account_alias.signin.aws.amazon.com/console/`

Para verificar o link de cadastro para usuários do IAM para a conta, abra o console do IAM e marque **IAM users sign-in link** no painel.

Para obter mais informações sobre IAM, consulte o seguinte:
+ [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/)
+ [Introdução ao IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [Guia do usuário do IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## Próxima etapa
<a name="setting-up-next-step-2"></a>

[Configure o AWS Command Line Interface (AWS CLI)](setup-awscli.md)

# Configure o AWS Command Line Interface (AWS CLI)
<a name="setup-awscli"></a>

Nesta etapa, você baixa e configura o AWS CLI para uso com o Amazon Managed Service para Apache Flink.

**nota**  
Os exercícios de conceitos básicos neste guia pressupõem o uso de credenciais de administrador (`adminuser`) em sua conta para executar as operações.

**nota**  
Se você já tem o AWS CLI instalado, talvez seja necessário fazer o upgrade para obter a funcionalidade mais recente. Para obter mais informações, consulte [Instalando a interface de linha de AWS comando](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) no *Guia AWS Command Line Interface do usuário*. Para verificar a versão do AWS CLI, execute o seguinte comando:  

```
aws --version
```
Os exercícios deste tutorial exigem a seguinte AWS CLI versão ou posterior:  

```
aws-cli/1.16.63
```

**Para configurar o AWS CLI**

1. Faça download e configure a AWS CLI. Para obter instruções, consulte os seguintes tópicos no *Guia do usuário do AWS Command Line Interface *: 
   + [Instalar a AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [Configurando a AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. Adicione um perfil nomeado para o usuário administrador no arquivo de AWS CLI configuração. Você usa esse perfil ao executar os AWS CLI comandos. Para obter mais informações sobre perfis nomeados, consulte [Perfis nomeados](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html) no *Guia do usuário da AWS Command Line Interface *.

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   Para obter uma lista das AWS regiões disponíveis, consulte [AWS Regiões e endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html) no *Referência geral da Amazon Web Services*.

1. Verifique a configuração digitando o seguinte comando no prompt de comando: 

   ```
   aws help
   ```

Depois de configurar uma AWS conta e a AWS CLI, você pode tentar o próximo exercício, no qual você configura um aplicativo de amostra e testa a end-to-end configuração.

## Próxima etapa
<a name="setting-up-next-step-3"></a>

[Criar e executar uma aplicação do Managed Service for Apache Flink](get-started-exercise.md)

# Criar e executar uma aplicação do Managed Service for Apache Flink
<a name="get-started-exercise"></a>

Neste exercício, será criado um aplicativo Managed Service for Apache Flink com fluxos de dados como origem e coletor.

**Topics**
+ [

## Criar dois fluxos de dados do Amazon Kinesis
](#get-started-exercise-1)
+ [

## Gravação de registros de amostra no fluxo de entrada
](#get-started-exercise-2)
+ [

## Baixar e examinar o código Java de fluxo do Apache Flink
](#get-started-exercise-5)
+ [

## Compilar o código da aplicação
](#get-started-exercise-5.5)
+ [

## Upload do código Java de fluxo do Apache Flink
](#get-started-exercise-6)
+ [

## Criar e executar a aplicação do Managed Service for Apache Flink
](#get-started-exercise-7)

## Criar dois fluxos de dados do Amazon Kinesis
<a name="get-started-exercise-1"></a>

Antes de criar uma aplicação do Amazon Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis (`ExampleInputStream` e `ExampleOutputStream`). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.

É possível criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para instruções do console, consulte [Criar e atualizar fluxos de dados](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**Como criar os fluxos de dados (AWS CLI)**

1. Para criar o primeiro stream (`ExampleInputStream`), use o seguinte comando do Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Gravação de registros de amostra no fluxo de entrada
<a name="get-started-exercise-2"></a>

Nesta seção, será usado um script Python para gravar registros de amostra no fluxo para o aplicativo processar.

**nota**  
Essa seção requer [AWS SDK para Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Crie um arquivo denominado `stock.py` com o conteúdo a seguir:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Mais adiante neste tutorial, será executado o script `stock.py` para enviar dados para o aplicativo. 

   ```
   $ python stock.py
   ```

## Baixar e examinar o código Java de fluxo do Apache Flink
<a name="get-started-exercise-5"></a>

O código do aplicativo Java para esses exemplos está disponível em GitHub. Para fazer download do código do aplicativo, faça o seguinte:

1. Duplique o repositório remoto com o seguinte comando:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Navegue até o diretório `GettingStarted`.

O código do aplicativo está localizado nos arquivos `CustomSinkStreamingJob.java` e `CloudWatchLogSink.java`. Observe o seguinte sobre o código do aplicativo:
+ A aplicação usa uma origem do Kinesis para ler o fluxo de origem. O trecho a seguir cria o coletor do Kinesis:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Compilar o código da aplicação
<a name="get-started-exercise-5.5"></a>

Nesta seção, será usado o compilador do Apache Maven para criar o código Java para o aplicativo. Para obter informações sobre como instalar o Apache Maven e o Java Development Kit (JDK), consulte [Pré-requisitos para concluir os exercícios](tutorial-stock-data.md#setting-up-prerequisites).

Seu aplicativo Java requer os seguintes componentes:
+ Um arquivo [Project Object Model (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html). este arquivo contém informações sobre a configuração e as dependências da aplicação, incluindo as bibliotecas do Amazon Managed Service for Apache Flink.
+ Um método `main` que contém a lógica do aplicativo.

**nota**  
**Para usar o conector do Kinesis para a aplicação a seguir, você precisa baixar o código-fonte do conector e compilá-lo como descrito na [documentação do Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html).**

**Como criar e compilar o código do aplicativo**

1. Crie um Java/Maven aplicativo em seu ambiente de desenvolvimento. Para obter informações sobre como criar um aplicativo, consulte a documentação do seu ambiente de desenvolvimento:
   + [Como criar seu primeiro projeto Java (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [Como criar, executar e empacotar seu primeiro aplicativo Java (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Use o código a seguir para um arquivo chamado `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Observe o seguinte sobre o exemplo de código anterior:
   + Este arquivo contém o método `main` que define a funcionalidade do aplicativo.
   + Seu aplicativo cria conectores de origem e de destino para acessar recursos externos usando um objeto `StreamExecutionEnvironment`. 
   + O aplicativo cria conectores de origem e de destino usando propriedades estáticas. Para usar as propriedades dinâmicas do aplicativo, use os métodos `createSourceFromApplicationProperties` e `createSinkFromApplicationProperties` para criar os conectores. Esses métodos leem as propriedades do aplicativo para configurar os conectores.

1. Para usar o seu código de aplicativo, compile-o e empacote-o em um arquivo JAR. Há duas formas de compilar e empacotar o código:
   + Use a ferramenta de linha de comando do Maven. Crie seu arquivo JAR executando o seguinte comando no diretório que contém o arquivo `pom.xml`:

     ```
     mvn package
     ```
   + Use o ambiente de desenvolvimento. Consulte a documentação de seu ambiente de desenvolvimento para obter mais detalhes.

   É possível carregar o pacote como um arquivo JAR, ou pode compactar o pacote e carregá-lo como um arquivo ZIP. Se você criar seu aplicativo usando o AWS CLI, especifique o tipo de conteúdo do código (JAR ou ZIP).

1. Se houver erros durante a compilação, verifique se sua variável de ambiente `JAVA_HOME` está definida corretamente.

Se o aplicativo for compilado com êxito, o arquivo a seguir é criado:

`target/java-getting-started-1.0.jar`

## Upload do código Java de fluxo do Apache Flink
<a name="get-started-exercise-6"></a>

Nesta seção, será criado um bucket do Amazon Simple Storage Service (Amazon S3) e realizado o upload do código do aplicativo.

**Para fazer upload do código do aplicativo**

1. Abra o console do Amazon S3 em [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Selecione **Criar bucket**.

1. Insira **ka-app-code-*<username>*** no campo **Nome do bucket**. Adicione um sufixo para o nome do bucket, como o nome do usuário, para torná-lo globalmente exclusivo. Selecione **Next** (Próximo).

1. Na etapa **Configurar opções**, mantenha as configurações como estão e selecione **Próximo**.

1. Na etapa **Definir permissões**, mantenha as configurações como estão e selecione **Próximo**.

1. Selecione **Criar bucket**.

1. **No console do Amazon S3, escolha o **ka-app-code- *<username>*** bucket e escolha Upload.**

1. Na etapa **Selecionar arquivos**, selecione **Adicionar arquivos**. Navegue até o arquivo `java-getting-started-1.0.jar`, criado na etapa anterior. Escolha **Próximo**.

1. Na etapa **Definir permissões**, mantenha as configurações como estão. Escolha **Próximo**.

1. Na etapa **Definir propriedades**, mantenha as configurações como estão. Escolha **Carregar**.

O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pelo aplicativo.

## Criar e executar a aplicação do Managed Service for Apache Flink
<a name="get-started-exercise-7"></a>

É possível criar e executar um aplicativo Managed Service for Apache Flink usando o console ou a AWS CLI.

**nota**  
Quando você cria o aplicativo usando o console, seus recursos AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs são criados para você. Ao criar o aplicativo usando o AWS CLI, você cria esses recursos separadamente.

**Topics**
+ [

### Criar e executar a aplicação (console)
](#get-started-exercise-7-console)
+ [

### Criar e executar a aplicação (AWS CLI)
](#get-started-exercise-7-cli)

### Criar e executar a aplicação (console)
<a name="get-started-exercise-7-console"></a>

Siga estas etapas para criar, configurar, atualizar e executar o aplicativo usando o console.

#### Criar a aplicação
<a name="get-started-exercise-7-console-create"></a>

1. [Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. No painel do Amazon Kinesis, escolha **Criar aplicativo de analytics**.

1. Na página **Kinesis Analytics – Criar aplicativo**, forneça os detalhes do aplicativo da seguinte forma:
   + Em **Nome do aplicativo**, insira **MyApplication**.
   + Em **Descrição**, insira **My java test app**.
   + Em **Runtime**, escolha **Apache Flink 1.6**.

1. Em **Permissões de acesso**, escolha **Criar/atualizar o perfil do IAM `kinesis-analytics-MyApplication-us-west-2`**.

1. Selecione **Criar aplicativo**.

**nota**  
Ao criar uma aplicação do Amazon Managed Service for Apache Flink usando o console, você tem a opção de ter um perfil do IAM e uma política criados para sua aplicação. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:  
Política: `kinesis-analytics-service-MyApplication-us-west-2`
Função: `kinesis-analytics-MyApplication-us-west-2`

#### Editar a política do IAM
<a name="get-started-exercise-7-console-iam"></a>

Edite a política do IAM para adicionar permissões de acesso aos fluxos de dados do Kinesis.

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Selecione **Políticas**. Selecione a política **`kinesis-analytics-service-MyApplication-us-west-2`** que o console criou na seção anterior. 

1. Na página **Resumo**, selecione **Editar política**. Selecione a guia **JSON**.

1. Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (*012345678901*) pelo ID da sua conta.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Configure o aplicativo
<a name="get-started-exercise-7-console-configure"></a>

1. Na **MyApplication**página, escolha **Configurar**.

1. Na página **Configurar aplicativo**, forneça o **Local do código**:
   + Em **Bucket do Amazon S3**, insira **ka-app-code-*<username>***.
   + Em **Caminho do objeto do Amazon S3**, insira **java-getting-started-1.0.jar**.

1. Na seção **‭Acesso aos recursos do aplicativo**‭‬, em **‭‬Permissões de acesso**‭, selecione ‭**Criar/atualizar o perfil do IAM ‭`kinesis-analytics-MyApplication-us-west-2`**.

1. Em **Propriedades**, **ID do grupo**, insira **ProducerConfigProperties**.

1. Insira as seguintes propriedades e valores de aplicativo:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/get-started-exercise.html)

1. Em **Monitoramento**, confirme se **Nível de monitoramento de métricas** está definido como **Aplicativo**.

1. Para **CloudWatch registrar**, marque a caixa de seleção **Ativar**.

1. Selecione **Atualizar**.

**nota**  
Quando você opta por ativar o CloudWatch registro, o Managed Service for Apache Flink cria um grupo de registros e um fluxo de registros para você. Os nomes desses recursos são os seguintes:   
Grupo de logs: `/aws/kinesis-analytics/MyApplication`
Fluxo de logs: `kinesis-analytics-log-stream`

#### Execute o aplicativo
<a name="get-started-exercise-7-console-run"></a>

1. Na **MyApplication**página, escolha **Executar**. Confirme a ação.

1. Quando o aplicativo estiver em execução, atualize a página. O console mostra o **Gráfico do aplicativo**.

#### Interromper a aplicação
<a name="get-started-exercise-7-console-stop"></a>

Na **MyApplication**página, escolha **Parar**. Confirme a ação.

#### Atualizar o aplicativo
<a name="get-started-exercise-7-console-update"></a>

Usando o console, é possível atualizar configurações do aplicativo, como as propriedades do aplicativo, as configurações de monitoramento e a localização ou o nome do arquivo JAR do aplicativo. Também é possível recarregar o JAR do aplicativo do bucket do Amazon S3 se for necessário atualizar o código do aplicativo.

Na **MyApplication**página, escolha **Configurar**. Atualize as configurações do aplicativo e selecione **Atualizar**.

### Criar e executar a aplicação (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

Nesta seção, você usa o AWS CLI para criar e executar o aplicativo Managed Service for Apache Flink. O Managed Service for Apache Flink usa o `kinesisanalyticsv2` AWS CLI comando para criar e interagir com o Managed Service for Apache Flink aplicativos.

#### Criar uma política de permissões
<a name="get-started-exercise-7-cli-policy"></a>

Primeiro, crie uma política de permissões com duas instruções: uma que concede permissões para a ação `read` no fluxo de origem, e outra que concede permissões para ações `write` no fluxo de destino. Em seguida, anexe a política a um perfil do IAM (que será criado na próxima seção). Assim, ao assumir o perfil, o serviço Managed Service for Apache Flink terá as permissões necessárias para ler o fluxo de origem e gravar no fluxo de coleta.

Use o código a seguir para criar a política de permissões `KAReadSourceStreamWriteSinkStream`. Substitua `username` pelo nome de usuário usado para criar o bucket do Amazon S3 e armazenar o código do aplicativo. Substitua o ID da conta nos nomes de recursos da Amazon (ARNs) (`012345678901`) pelo ID da sua conta.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

Para step-by-step obter instruções sobre como criar uma política de permissões, consulte [Tutorial: Criar e anexar sua primeira política gerenciada pelo cliente](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) no *Guia do usuário do IAM*.

**nota**  
Para acessar outros AWS serviços, você pode usar AWS SDK para Java o. O Managed Service for Apache Flink define automaticamente as credenciais exigidas pelo SDK como as credenciais do perfil do IAM associado a seu aplicativo. Não é necessária nenhuma etapa adicional.

#### Criar um perfil do IAM
<a name="get-started-exercise-7-cli-role"></a>

Nesta seção, você cria um perfil do IAM que o Managed Service for Apache Flink pode assumir para ler um fluxo de origem e gravar no fluxo de coleta.

O Managed Service for Apache Flink não pode acessar seu fluxo sem permissões. Essas permissões são concedidas usando um perfil do IAM. Cada perfil do IAM tem duas políticas anexadas. A política de confiança concede ao Managed Service for Apache Flink permissão para assumir o perfil, e a política de permissões determina o que o serviço pode fazer depois de assumir a função.

Anexe a política de permissões que criou na seção anterior a essa função.

**Para criar uma perfil do IAM**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. No painel de navegação, selecione **Funções** e **Criar função**.

1. Em **Selecionar tipo de identidade de confiança**, selecione **Serviço da AWS **. Em **Selecionar o serviço que usará esta função**, selecione **Kinesis**. Em **Selecionar seu caso de uso**, selecione **Kinesis Analytics**.

   Selecione **Next: Permissions** (Próximo: permissões).

1. Na página **Attach permissions policies**, selecione **Next: Review**. É possível anexar políticas de permissões depois de criar a função.

1. Na página **Criar função**, insira **KA-stream-rw-role** para o **Nome da função**. Selecione **Criar função**.

   Foi criado um perfil do IAM chamado `KA-stream-rw-role`. Em seguida, atualize as políticas de confiança e de permissões para a função.

1. Anexe a política de permissões à função.
**nota**  
Para este exercício, o Managed Service for Apache Flink assume esse perfil para ler dados de um fluxo de dados do Kinesis (origem) e gravar a saída em outro fluxo de dados do Kinesis. Depois, anexe a política criada na etapa anterior, [Criar uma política de permissões](#get-started-exercise-7-cli-policy).

   1. Na página **Resumo**, selecione a guia **Permissões**.

   1. Selecione **Attach Policies**.

   1. Na caixa de pesquisa, insira **KAReadSourceStreamWriteSinkStream** (a política criada na seção anterior).

   1. Selecione a política **KAReadInputStreamWriteOutputStream** e selecione **Anexar política**.

Agora você criou a função de execução de serviço que seu aplicativo usa para acessar os recursos. Anote o ARN da nova função.

Para step-by-step obter instruções sobre como criar uma função, consulte [Como criar uma função do IAM (console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) no *Guia do usuário do IAM*.

#### Criar o aplicativo do Managed Service for Apache Flink
<a name="get-started-exercise-7-cli-create"></a>

1. Salve o seguinte código JSON em um arquivo chamado `create_request.json`. Substitua o ARN da função de amostra pelo ARN da função criada anteriormente. Substitua o sufixo do ARN do bucket (`username`) pelo sufixo selecionado na seção anterior. Substitua o ID da conta de exemplo (`012345678901`) na função de execução do serviço pelo ID da conta.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Execute a ação [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) com a solicitação anterior para criar o aplicativo: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

O aplicativo agora é criado. Inicie o aplicativo na próxima etapa.

#### Iniciar o aplicativo
<a name="get-started-exercise-7-cli-start"></a>

Nesta seção, a ação [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) será usada para iniciar o aplicativo.

**Para iniciar o aplicativo**

1. Salve o seguinte código JSON em um arquivo chamado `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Execute a ação [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) com a solicitação anterior para iniciar o aplicativo:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

O aplicativo agora está em execução. Você pode verificar as métricas do Managed Service for Apache Flink no CloudWatch console da Amazon para verificar se o aplicativo está funcionando.

#### Interromper o aplicativo
<a name="get-started-exercise-7-cli-stop"></a>

Nesta seção, a ação [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) será usada para interromper o aplicativo.

**Como interromper o aplicativo**

1. Salve o seguinte código JSON em um arquivo chamado `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Execute a ação [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) com a seguinte solicitação para interromper o aplicativo:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

O aplicativo agora está interrompido.

# Tutorial: Use AWS Lambda com o Amazon Kinesis Data Streams
<a name="tutorial-stock-data-lambda"></a>

Neste tutorial, uma função do Lambda será criada para consumir eventos de um fluxo de dados do Kinesis. Neste cenário de exemplo, um aplicativo personalizado grava registros em um stream de dados do Kinesis. AWS Lambda em seguida, pesquisa esse fluxo de dados e, quando detecta novos registros de dados, invoca sua função Lambda. AWS Lambda em seguida, executa a função Lambda assumindo a função de execução que você especificou ao criar a função Lambda.

Para obter instruções detalhadas passo a passo, consulte [Tutorial: Usando o AWS Lambda com o Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html). 

**nota**  
Este tutorial presume algum conhecimento de operações básicas do Lambda e do console do AWS Lambda . Se ainda não o fez, siga as instruções em [Introdução ao AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) para criar sua primeira função do Lambda.

# Use a solução AWS de streaming de dados para o Amazon Kinesis
<a name="examples-streaming-solution"></a>

A solução AWS de dados de streaming para Amazon Kinesis configura automaticamente os AWS serviços necessários para capturar, armazenar, processar e entregar dados de streaming com facilidade. A solução oferece várias opções para resolver casos de uso de dados de streaming que usam vários AWS serviços, incluindo Kinesis Data AWS Lambda Streams, Amazon API Gateway e Amazon Managed Service para Apache Flink. 

Cada solução inclui os seguintes componentes:
+ Um CloudFormation pacote para implantar o exemplo completo.
+ Um CloudWatch painel para exibir as métricas do aplicativo.
+ CloudWatch alarmes sobre as métricas de aplicação mais relevantes.
+ Todos os perfis do IAM e políticas necessários.

A solução pode ser encontrada em [Solução de dados de transmissão para o Amazon Kinesis](https://aws.amazon.com/solutions/implementations/aws-streaming-data-solution-for-amazon-kinesis/)