

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Tutoriales de introducción a Amazon Kinesis Data Streams
<a name="examples"></a>

Amazon Kinesis Data Streams ofrece varias soluciones para la ingesta y el consumo de datos de los flujos de datos de Kinesis. Los tutoriales en esta sección están diseñados para ayudarle a comprender mejor los conceptos y la funcionalidad de Amazon Kinesis Data Streams, e identificar la solución que satisface sus necesidades. 

**Topics**
+ [

# Tutorial: Procesar datos de operaciones bursátiles en tiempo real con KPL y KCL 2.x
](tutorial-stock-data-kplkcl2.md)
+ [

# Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x
](tutorial-stock-data-kplkcl.md)
+ [

# Tutorial: Analizar datos bursátiles en tiempo real con Amazon Managed Service para Apache Flink
](tutorial-stock-data.md)
+ [

# Tutorial: Uso AWS Lambda con Amazon Kinesis Data Streams
](tutorial-stock-data-lambda.md)
+ [

# Utilice la solución AWS de transmisión de datos para Amazon Kinesis
](examples-streaming-solution.md)

# Tutorial: Procesar datos de operaciones bursátiles en tiempo real con KPL y KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

El escenario que planteamos en este tutorial comprende la adquisición de operaciones bursátiles en un flujo de datos y la escritura de una aplicación de Amazon Kinesis Data Streams básica que realiza cálculos con dicho flujo. Aprenderá a enviar un flujo de registros a Kinesis Data Streams y a implementar una aplicación que consume y procesa dichos registros casi en tiempo real.

**importante**  
Tras crear una transmisión, su cuenta incurre en cargos nominales por el uso de Kinesis Data Streams, ya que Kinesis Data Streams no reúne los requisitos para AWS la capa gratuita. Una vez que se inicia la aplicación consumidora, se aplicarán cargos nominales por el uso de Amazon DynamoDB. La aplicación consumidora utiliza DynamoDB para realizar un seguimiento del estado de procesamiento. Cuando termine con esta aplicación, elimine sus recursos de AWS para dejar de incurrir en gastos. Para obtener más información, consulte [Eliminar recursos](tutorial-stock-data-kplkcl2-finish.md).

El código no obtiene acceso a datos bursátiles reales, sino que, en su lugar, simula la secuencia de operaciones bursátiles. Lo hace a través de un generador de operaciones bursátiles aleatorias que parte de datos reales del mercado para los 25 principales valores por capitalización en febrero de 2015. Si tiene acceso a una secuencia de operaciones bursátiles en tiempo real, puede que le interese derivar estadísticas útiles y puntuales a partir de dicha secuencia. Por ejemplo, es posible que desee realizar un análisis de ventana deslizante en el que se determina el valor más popular adquirido durante los últimos 5 minutos. O también cabe la posibilidad de que quiera recibir una notificación cada vez que haya una orden de venta que sea demasiado grande (es decir, con demasiadas acciones). Puede ampliar el código de esta serie para proporcionar esta funcionalidad.

Puede seguir los pasos de este tutorial en su ordenador de escritorio o portátil y ejecutar tanto el código de productor como el de consumidor en la misma máquina o en cualquier plataforma que sea compatibles con los requisitos definidos.

Los ejemplos mostrados utilizan la región Oeste de EE. UU. (Oregón), pero funcionan en cualquiera de las [regiones de AWS compatibles con Kinesis Data Streams](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Cómo completar los requisitos previos de
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Crear un flujo de datos
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Crear un usuario y una política de IAM
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Descargar y compilar el código
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implementar el productor
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implementar el consumidor
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Opcional) Ampliar el consumidor
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Eliminar recursos
](tutorial-stock-data-kplkcl2-finish.md)

# Cómo completar los requisitos previos de
<a name="tutorial-stock-data-kplkcl2-begin"></a>

Debe cumplir los siguientes requisitos para realizar este tutorial:

## Crear y usar una cuenta de Amazon Web Services
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Antes de comenzar, asegúrese de estar familiarizado con los conceptos que se abordan en [Terminología y conceptos de Amazon Kinesis Data Streams](key-concepts.md), especialmente los relacionados con flujos, particiones, productores y consumidores. También es útil haber completado los pasos de la siguiente guía: [Tutorial: Instalación y configuración AWS CLI para Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Debe tener una AWS cuenta y un navegador web para acceder a. Consola de administración de AWS

Para acceder a la consola, utilice su nombre de usuario y contraseña de IAM para iniciar sesión en la [Consola de administración de AWS](https://console.aws.amazon.com/console/home) desde la página de inicio de sesión de IAM. Para obtener información sobre las credenciales AWS de seguridad, incluido el acceso programático y las alternativas a las credenciales de larga duración, consulte las [credenciales de AWS seguridad](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) en la Guía del *usuario de IAM*. Para obtener más información sobre cómo iniciar sesión en su Cuenta de AWS cuenta, consulte [Cómo iniciar sesión en](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) la AWS Guía del *AWS Sign-In usuario*.

Para obtener más información sobre IAM; y las instrucciones de configuración de la clave de seguridad, consulte [Creación de un usuario de IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Cumplir los requisitos de software del sistema
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

El sistema que está utilizando para ejecutar la aplicación debe tener instalada la versión de Java 7 o superior. Para descargar e instalar la versión más reciente del kit de desarrollo de Java (JDK), visite el [sitio web de instalación de Java SE de Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

Necesitará la última versión de [AWS SDK para Java](https://aws.amazon.com/sdk-for-java/). 

[La aplicación para consumidores requiere la versión 2.2.9 o superior de la biblioteca de clientes de Kinesis (KCL), que puede obtener en /tree/master. GitHub https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Crear un flujo de datos](tutorial-stock-data-kplkcl2-create-stream.md)

# Crear un flujo de datos
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

En primer lugar, debe crear la secuencia de datos que utilizará en los pasos siguientes de este tutorial.

**Para crear un flujo**

1. [Inicie sesión en la consola de Kinesis Consola de administración de AWS y ábrala en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Elija **Flujos de datos** en el panel de navegación.

1. En la barra de navegación, expanda el selector de regiones y seleccione una región.

1. Elija **Create Kinesis stream (Crear secuencia de Kinesis)**.

1. Escriba un nombre para la secuencia (por ejemplo **StockTradeStream**).

1. Introduzca **1** como número de particiones, pero deje **Calcular el número de particiones que va a necesitar** contraído.

1. Elija **Create Kinesis stream (Crear secuencia de Kinesis)**.

En la página de lista **Flujos de Kinesis**, el estado del flujo aparece como `CREATING` mientras se crean los flujos. Cuando la secuencia está lista para usarse, el estado cambia a `ACTIVE`. 

Si elige el nombre del flujo, en la página que aparece, la ficha **Detalles** muestra un resumen de la configuración del flujo de datos. La sección **Monitoreo** muestra información de supervisión del flujo.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Crear un usuario y una política de IAM](tutorial-stock-data-kplkcl2-iam.md)

# Crear un usuario y una política de IAM
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Las mejores prácticas de seguridad para AWS dictar el uso de permisos detallados para controlar el acceso a los diferentes recursos. AWS Identity and Access Management (IAM) le permite administrar los usuarios y los permisos de los usuarios. AWS Una [política de IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) enumera de forma explícita las acciones que se pueden realizar y los recursos a los que se pueden aplicar dichas acciones.

A continuación se indican los permisos mínimos que suelen requerirse para los productores y consumidores de Kinesis Data Streams.


**Productor**  

| Acciones | Recurso | Finalidad | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Flujo de datos de Kinesis | Antes de intentar leer registros, el consumidor comprueba si la secuencia de datos existe y si está activa, y si los fragmentos se encuentran en la secuencia de datos. | 
| SubscribeToShard, RegisterStreamConsumer | Flujo de datos de Kinesis | Suscribe y registra a los consumidores en un fragmento. | 
| PutRecord, PutRecords | Flujo de datos de Kinesis | Escribe registros en Kinesis Data Streams. | 


**Consumidor**  

| **Acciones** | **Resource** | **Finalidad** | 
| --- | --- | --- | 
| DescribeStream | Flujo de datos de Kinesis | Antes de intentar leer registros, el consumidor comprueba si la secuencia de datos existe y si está activa, y si los fragmentos se encuentran en la secuencia de datos. | 
| GetRecords, GetShardIterator  | Flujo de datos de Kinesis | Lee registros de un fragmento. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabla de Amazon DynamoDB | Si el consumidor se desarrolla con Kinesis Client Library (KCL) (versión 1.x o 2.x), necesita permisos en una tabla de DynamoDB para realizar un seguimiento del estado de procesamiento de la aplicación. | 
| DeleteItem | Tabla de Amazon DynamoDB | Para cuando el consumidor realiza split/merge operaciones en fragmentos de Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro de Amazon | El KCL también carga métricas CloudWatch, que son útiles para monitorear la aplicación. | 

Para este tutorial, creará una única política de IAM que otorgue todos los permisos precedentes. En producción, es posible que desee crear dos políticas, una para los productores y otra para los consumidores.

**Para crear una política de IAM**

1. Busque el nombre de recurso de Amazon (ARN) del nuevo flujo de datos que creó en el paso anterior. Puede encontrar este ARN como **Stream ARN (ARN de la secuencia)** en la parte superior de la pestaña **Details (Detalles)**. El formato del ARN es el siguiente:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
El código de AWS región; por ejemplo,. `us-west-2` Para obtener más información, consulte [Conceptos de región y zona de disponibilidad](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*inscrita*  
El ID de la AWS cuenta, tal y como se muestra en la [configuración de la cuenta](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
El nombre del flujo de datos que creó en el paso anterior, que es `StockTradeStream`.

1. Determine el ARN de la tabla de DynamoDB que utilizará el consumidor (y que creará la primera instancia de consumidor). Debe tener el siguiente formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   La región y el ID de cuenta son idénticos a los valores del ARN del flujo de datos que se utiliza para este tutorial, pero el nombre es el *nombre* de la tabla de DynamoDB creada y utilizada por la aplicación del consumidor. KCL utiliza el nombre de la aplicación como nombre de tabla. En este paso, utilice `StockTradesProcessor` para el nombre de la tabla DynamoDB, ya que ese es el nombre de la aplicación utilizado en los pasos posteriores de este tutorial.

1. En la consola de IAM, en **Políticas** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), selecciona **Crear política**. Si es la primera vez que trabaja con políticas de IAM, elija **Introducción**, **Crear política**.

1. Elija **Seleccionar** junto a **Generador de políticas**.

1. Elija **Amazon Kinesis** como servicio. AWS 

1. Seleccione `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` y `PutRecords` como acciones permitidas.

1. Introduzca el ARN del flujo de datos que está utilizando en este tutorial.

1. Utilice **Add Statement (Añadir instrucción)** para cada uno de los siguientes:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   El asterisco (`*`) que se utiliza cuando no es necesario especificar un ARN. En este caso, se debe a que no hay un recurso específico CloudWatch en el que se invoque la `PutMetricData` acción.

1. Elija **Paso siguiente**.

1. Cambie **Policy Name (Nombre de política)** a `StockTradeStreamPolicy`, revise el código y elija **Create Policy (Crear política)**.

El documento de política resultante debería tener el siguiente aspecto:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Para crear un usuario de IAM**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En la página **Users (Usuarios)**, elija **Add user (Añadir usuario)**.

1. En **Nombre de usuario**, escriba `StockTradeStreamUser`.

1. En **Access type (Tipo de acceso)**, elija **Programmatic access (Acceso por programa)** y, a continuación, elija **Next: Permissions (Siguiente: Permisos)**.

1. Elija **Asociar políticas existentes directamente**.

1. Busque la política que creó en el procedimiento precedente por su nombre (`StockTradeStreamPolicy`. Seleccione la casilla situada a la izquierda del nombre de la política y, a continuación, elija **Next: Review (Siguiente: Revisión)**.

1. Revise los detalles y el resumen y, a continuación, elija **Create user (Crear usuario)**.

1. Copie el valor de **Access key ID (ID de clave de acceso)** y guárdelo de modo confidencial. En **Secret access key (Clave de acceso secreta)**, elija **Show (Mostrar)** y guarde también esa clave de forma confidencial.

1. Pegue las claves de acceso y secreta en un archivo local en una ubicación segura a la que solo pueda obtener acceso usted. Para esta aplicación, cree un archivo denominado ` ~/.aws/credentials` (con permisos estrictos). El archivo debe tener el siguiente formato:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Asociar una política de IAM a un usuario**

1. En la consola de IAM, abra [Políticas](https://console.aws.amazon.com/iam/home?#policies) y elija **Acciones de política**. 

1. Elija `StockTradeStreamPolicy` y **Asociar**.

1. Elija `StockTradeStreamUser` y **Asociar política**.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Descargar y compilar el código](tutorial-stock-data-kplkcl2-download.md)

# Descargar y compilar el código
<a name="tutorial-stock-data-kplkcl2-download"></a>

Este tema proporciona código de implementación de ejemplo para la ingesta de operaciones bursátiles de muestra en la secuencia de datos (*productor*) y el procesamiento de estos datos (*consumidor*).

**Para descargar y compilar el código**

1. Descarga el código fuente del [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repositorio a tu ordenador.

1. Cree un proyecto en su IDE con el código fuente, adhiriéndose a la estructura de directorios proporcionada.

1. Añada las siguientes bibliotecas al proyecto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Según el IDE, el proyecto puede compilarse automáticamente. Si no, compile el proyecto con los pasos apropiados para su IDE.

Si completa estos pasos correctamente, estará preparado para pasar a la siguiente sección, [Implementar el productor](tutorial-stock-data-kplkcl2-producer.md). 

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementar el productor](tutorial-stock-data-kplkcl2-producer.md)Implementar el productor](tutorial-stock-data-kplkcl2-producer.md)

# Implementar el productor
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Este tutorial utiliza el escenario del mundo real de la monitorización del mercado bursátil. Los siguientes principios explican brevemente cómo este escenario se asigna al productor y su estructura de código compatible.

Consulte el [código fuente](https://github.com/aws-samples/amazon-kinesis-learning ) y revise la siguiente información.

**StockTrade clase**  
Una operación bursátil está representada por una instancia de la clase StockTrade. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted. 

**Registro de la secuencia**  
Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de `StockTrade` en formato JSON. Por ejemplo:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator clase**  
StockTradeGenerator tiene un método llamado `getRandomTrade()` que devuelve una nueva operación bursátil generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted.

**StockTradesWriter clase**  
El método `main` del productor, StockTradesWriter recupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:  

1. Lee el nombre del flujo de datos y el nombre de la región como entrada.

1. Utiliza el `KinesisAsyncClientBuilder` para establecer la región, las credenciales y la configuración de cliente. 

1. Comprueba que la secuencia existe y está activa (si no, sale con un error). 

1. En un bucle continuo, llama al método `StockTradeGenerator.getRandomTrade()` y después al método `sendStockTrade` para enviar la transacción a la secuencia cada 100 milisegundos. 
El método `sendStockTrade` de la clase `StockTradesWriter` tiene el siguiente código:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Consulte el siguiente desglose del código:
+ La API de `PutRecord` espera una matriz de bytes y debe convertir la transacción al formato JSON. Esta única línea de código realiza esa operación: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Antes de poder enviar la transacción, debe crear una nueva instancia de `PutRecordRequest` (denominada solicitud en este caso). Cada llamada a `request` requiere el nombre de la secuencia, la clave de partición y el blob de datos. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  El ejemplo utiliza una graduación de existencias como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte [Datos de escritura en Amazon Kinesis Data Streams](building-producers.md).

  Ahora `request` estará listo para el envío al cliente (la operación PUT): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Añada el try/catch bloque alrededor de la `put` operación: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Esto se debe a que una operación put de Kinesis Data Streams puede fallar debido a un error de red o debido a que la secuencia de datos alcanza sus límites de rendimiento y se estrangula. Se recomienda comprobar detalladamente su política de reintentos para las operaciones de `put` de modo que evite la pérdida de datos, por ejemplo, utilizando un reintento. 
+ El registro de estado resulta útil, pero es opcional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams, `PutRecord`. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros de `PutRecords` y enviar lotes de registros de una vez. Para obtener más información, consulte [Datos de escritura en Amazon Kinesis Data Streams](building-producers.md).

**Para ejecutar el productor**

1. Compruebe que el par de clave de acceso y el par de clave secreta recuperadas en [Crear un usuario y una política de IAM](tutorial-stock-data-kplkcl2-iam.md) se guardan en el archivo `~/.aws/credentials`. 

1. Ejecute la clase `StockTradeWriter` con los siguientes argumentos:

   ```
   StockTradeStream us-west-2
   ```

   Si ha creado su secuencia en una región diferente a `us-west-2` tendrá que especificar esa región aquí.

Debería ver una salida similar a esta:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Ahora, Kinesis Data Streams está ingiriendo sus operaciones bursátiles.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementar el consumidor](tutorial-stock-data-kplkcl2-consumer.md)

# Implementar el consumidor
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

La aplicación del consumidor en este tutorial procesa continuamente las operaciones bursátiles de su flujo de datos. A continuación, genera como resultado las acciones que más se compran y venden cada minuto. La aplicación se basa en la biblioteca Kinesis Client Library (KCL), que se ocupa de gran parte de las tareas que normalmente deben afrontar las aplicaciones consumidoras. Para obtener más información, consulte [Información sobre KCL 1.x y 2.x](shared-throughput-kcl-consumers.md). 

Consulte el código fuente y revise la siguiente información.

**StockTradesProcessor clase**  
La clase principal del consumidor, que se proporcionas por usted y realiza las siguientes tareas:  
+ Lee los nombres de aplicación, flujo de datos y región que se pasan como argumentos.
+ Crea una instancia de `KinesisAsyncClient` con el nombre de región.
+ Crea una instancia de `StockTradeRecordProcessorFactory` que sirve instancias de `ShardRecordProcessor`, implementadas por una instancia de `StockTradeRecordProcessor`. 
+ Crea una instancia de `ConfigsBuilder` con las instancias `KinesisAsyncClient`, `StreamName`, `ApplicationName` y `StockTradeRecordProcessorFactory`. Esto es útil para crear todas las configuraciones con valores predeterminados.
+ Crea un programador de KCL (anteriormente, en las versiones 1.x de KCL se conocía como trabajador de KCL) con la instancia `ConfigsBuilder`. 
+ El programador crea un nuevo hilo para cada fragmento (asignado a esta instancia de consumidor), que realiza un bucle continuo para leer registros de la secuencia de datos. A continuación, invoca a la instancia de `StockTradeRecordProcessor` para procesar cada lote de registros recibido. 

**StockTradeRecordProcessor clase**  
Implementación de la instancia `StockTradeRecordProcessor`, que a su vez implementa tres métodos necesarios: `initialize`, `processRecords`, `leaseLost`, `shardEnded` y `shutdownRequested`.   
El KCL utiliza los métodos `initialize` y `shutdownRequested` para que el procesador de registros sepa cuándo debe estar listo para comenzar a recibir registros y cuándo debe esperar dejar de recibir registros, respectivamente, de modo que pueda realizar cualquier tarea de configuración y terminación específica de la aplicación. `leaseLost` y `shardEnded` se utilizan para implementar cualquier lógica de qué hacer cuando se pierde un arrendamiento o un procesamiento ha llegado al final de un fragmento. En este ejemplo, simplemente registramos mensajes que indican estos eventos.   
Le proporcionamos el código para estos métodos. El procesamiento principal sucede en el método `processRecords`, que a su vez utiliza `processRecord` para cada registro. Este último método se proporciona como el código esqueleto mayormente vacío para que lo implemente en el siguiente paso, donde se explica con mayor detalle.   
También hay que resaltar la implementación de métodos de compatibilidad para `processRecord`: `reportStats` y `resetStats`, que están vacíos en el código fuente original.   
El método `processRecords` se implementa por usted, y realiza los pasos siguientes:  
+ En cada registro que se pase, llama a su `processRecord`. 
+ Si ha pasado al menos 1 minuto desde el último informe, llama a `reportStats()`, que imprime las últimas estadísticas y, a continuación, a `resetStats()`, que elimina las estadísticas para que el próximo intervalo incluya solo registros nuevos.
+ Establece el momento del siguiente informe.
+ Si ha transcurrido al menos un minuto desde el último punto de comprobación de la base de datos, llama a `checkpoint()`. 
+ Establece el momento del siguiente punto de comprobación.
Este método utiliza intervalos de 60 segundos para la velocidad de elaboración de informes y puntos de comprobación. Para más información sobre el punto de control, consulte [Uso de Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats clase**  
Esta clase proporciona retención de datos y seguimiento de estadísticas para las acciones más populares a lo largo del tiempo. Proporcionamos este código por usted y contiene los siguientes métodos:  
+ `addStockTrade(StockTrade)`: inserta el `StockTrade` dado en las estadísticas de ejecución.
+ `toString()`: devuelve las estadísticas en una cadena con formato.
Esta clase hace un seguimiento de los valores más populares, al realizar un recuento continuo del número total de transacciones para cada valor y del recuento máximo. Asimismo, actualiza estos recuentos cada vez que llega una operación nueva.

Agregar código para los métodos de la clase `StockTradeRecordProcessor`, tal y como se muestra en los pasos siguientes. 

**Para implementar el consumidor**

1. Implemente el método `processRecord` creando una instancia de un objeto `StockTrade` con el tamaño correcto y añadiéndole los datos de registro, registrando una advertencia si hay algún problema. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implemente un método `reportStats`. Modifique el formato de salida según sus preferencias. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Por último, implemente el método `resetStats`, lo que creará una nueva instancia de `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implemente los siguientes métodos requeridos por la interfaz `ShardRecordProcessor`:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Para ejecutar el consumidor**

1. Ejecute el productor que escribió en [[Implementar el productor](tutorial-stock-data-kplkcl2-producer.md)Implementar el productor](tutorial-stock-data-kplkcl2-producer.md) para insertar registros de operaciones bursátiles simuladas en la secuencia.

1. Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo `~/.aws/credentials`. 

1. Ejecute la clase `StockTradesProcessor` con los siguientes argumentos:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Tenga en cuenta que si ha creado su secuencia en una región diferente a `us-west-2` tiene que especificar esa región aquí.

Después de un minuto, debería ver un resultado similar a este, actualizado cada minuto:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Opcional) Ampliar el consumidor](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Opcional) Ampliar el consumidor
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

En esta sección opcional, mostramos cómo puede ampliar el código del consumidor para un escenario ligeramente más complicado.

Si desea conocer cuáles son las mayores operaciones de venta de cada minuto, puede modificar la clase `StockStats` en tres lugares para dar cabida a esta nueva prioridad.

**Para ampliar el consumidor**

1. Agregue nuevas variables de instancia:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Añada el siguiente código a `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modifique el método `toString` para imprimir la información adicional:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Si ejecuta el consumidor ahora (recuerde ejecutar también el productor), debería ver un resultado similar a este:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Eliminar recursos](tutorial-stock-data-kplkcl2-finish.md)

# Eliminar recursos
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Como paga por utilizar el flujo de datos de Kinesis, asegúrese de eliminarla, así como la tabla de Amazon DynamoDB correspondiente, cuando haya terminado con ella. Se aplican cargos nominales sobre una secuencia activa incluso aunque no esté enviando ni recibiendo registros. Esto se debe a que una secuencia activa está utilizando los recursos "escuchando" de forma continua los registros entrantes y las solicitudes para obtener registros.

**Para eliminar la secuencia y la tabla**

1. Cierre los productores y los consumidores que podrían estar ejecutándose aún.

1. [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Seleccione la secuencia que haya creado para esta aplicación (`StockTradeStream`).

1. Elija **Delete Stream (Eliminar secuencia)**.

1. Abra la consola de DynamoDB en. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Elimine la tabla `StockTradesProcessor`.

## Resumen
<a name="tutorial-stock-data-kplkcl2-summary"></a>

El procesamiento de una gran cantidad de datos casi en tiempo real no requiere escribir ningún código complicado ni desarrollar una infraestructura enorme. Es tan básico como escribir lógica para procesar una pequeña cantidad de datos (como escribir `processRecord(Record)`), pero con Kinesis Data Streams para escalarla de forma que funcione con una gran cantidad de datos transmitidos. No tiene que preocuparse de cómo escalar su procesamiento, ya que Kinesis Data Streams lo administra por usted. Lo único que tiene que hacer es enviar sus registros de streaming a Kinesis Data Streams y escribir la lógica para procesar cada nuevo registro recibido. 

A continuación se muestran algunas posibles mejoras para esta aplicación.

**Agregación en todos los fragmentos**  
En la actualidad, obtiene estadísticas derivadas de agrupar los registros de datos que se reciben de un único proceso de trabajo desde un único fragmento. (Un fragmento no puede ser procesado por más de un proceso de trabajo en una sola aplicación al mismo tiempo). Lógicamente, si escala y tiene más de un fragmento, es posible que quiera realizar la agregación en todos los fragmentos. Podrá hacerlo con una arquitectura de canalización en la que el resultado de cada proceso de trabajo se envíe a otra secuencia con un único fragmento, que se procesará por parte de un proceso de trabajo que agregue los resultados de la primera etapa. Dado que los datos de la primera etapa están limitados (a una muestra por minuto y fragmento), pueden ser administrados fácilmente por un fragmento.

**Procesamiento de la escala**  
Cuando la secuencia se escala para tener muchos fragmentos (porque hay muchos productores enviando datos), el método para aumentar el procesamiento es agregar más procesos de trabajo. Puede ejecutar los procesos de trabajo en instancias de Amazon EC2 y utilizar grupos de escalado automático.

**Usa conectores para Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Como una transmisión se procesa de forma continua, su salida se puede enviar a otros destinos. AWS proporciona [conectores](https://github.com/awslabs/amazon-kinesis-connectors) para integrar Kinesis Data Streams con AWS otros servicios y herramientas de terceros.

# Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x
<a name="tutorial-stock-data-kplkcl"></a>

El escenario que planteamos en este tutorial comprende la adquisición de operaciones bursátiles en un flujo de datos y la escritura de una aplicación de Amazon Kinesis Data Streams sencilla que realiza cálculos con dicho flujo. Aprenderá a enviar un flujo de registros a Kinesis Data Streams y a implementar una aplicación que consume y procesa dichos registros casi en tiempo real.

**importante**  
Tras crear una transmisión, su cuenta incurre en cargos nominales por el uso de Kinesis Data Streams, ya que Kinesis Data Streams no reúne los requisitos para AWS la capa gratuita. Una vez que se inicia la aplicación consumidora, se aplicarán cargos nominales por el uso de Amazon DynamoDB. La aplicación consumidora utiliza DynamoDB para realizar un seguimiento del estado de procesamiento. Cuando termine con esta aplicación, elimine sus recursos de AWS para dejar de incurrir en gastos. Para obtener más información, consulte [Eliminar recursos](tutorial-stock-data-kplkcl-finish.md).

El código no obtiene acceso a datos bursátiles reales, sino que, en su lugar, simula la secuencia de operaciones bursátiles. Lo hace a través de un generador de operaciones bursátiles aleatorias que parte de datos reales del mercado para los 25 principales valores por capitalización en febrero de 2015. Si tiene acceso a una secuencia de operaciones bursátiles en tiempo real, puede que le interese derivar estadísticas útiles y puntuales a partir de dicha secuencia. Por ejemplo, es posible que desee realizar un análisis de ventana deslizante en el que se determina el valor más popular adquirido durante los últimos 5 minutos. O también cabe la posibilidad de que quiera recibir una notificación cada vez que haya una orden de venta que sea demasiado grande (es decir, con demasiadas acciones). Puede ampliar el código de esta serie para proporcionar esta funcionalidad.

Puede seguir los pasos de este tutorial en su equipo de escritorio o portátil y ejecutar el código tanto del consumidor como del productor en la misma máquina o en cualquier plataforma que cumpla los requisitos definidos, como Amazon Elastic Compute Cloud (Amazon EC2).

Los ejemplos mostrados utilizan la región Oeste de EE. UU. (Oregón), pero funcionan en cualquiera de las [regiones de AWS compatibles con Kinesis Data Streams](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Cómo completar los requisitos previos de
](tutorial-stock-data-kplkcl-begin.md)
+ [

# Crear un flujo de datos
](tutorial-stock-data-kplkcl-create-stream.md)
+ [

# Crear un usuario y una política de IAM
](tutorial-stock-data-kplkcl-iam.md)
+ [

# Descargar y compilar el código de implementación
](tutorial-stock-data-kplkcl-download.md)
+ [

# Implementar el productor
](tutorial-stock-data-kplkcl-producer.md)
+ [

# Implementar el consumidor
](tutorial-stock-data-kplkcl-consumer.md)
+ [

# (Opcional) Ampliar el consumidor
](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [

# Eliminar recursos
](tutorial-stock-data-kplkcl-finish.md)

# Cómo completar los requisitos previos de
<a name="tutorial-stock-data-kplkcl-begin"></a>

A continuación, se indican los requisitos para completar el [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x[Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md).

## Crear y usar una cuenta de Amazon Web Services
<a name="tutorial-stock-data-kplkcl-begin-aws"></a>

Antes de comenzar, asegúrese de estar familiarizado con los conceptos que se abordan en [Terminología y conceptos de Amazon Kinesis Data Streams](key-concepts.md), especialmente los relacionados con secuencias, fragmentos, productores y consumidores. También le será de ayuda haber completado [Tutorial: Instalación y configuración AWS CLI para Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Necesita una AWS cuenta y un navegador web para acceder a. Consola de administración de AWS

Para acceder a la consola, utilice su nombre de usuario y contraseña de IAM para iniciar sesión en la [Consola de administración de AWS](https://console.aws.amazon.com/console/home) desde la página de inicio de sesión de IAM. Para obtener información sobre las credenciales AWS de seguridad, incluido el acceso programático y las alternativas a las credenciales de larga duración, consulte las [credenciales de AWS seguridad](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) en la Guía del *usuario de IAM*. Para obtener más información sobre cómo iniciar sesión en su Cuenta de AWS cuenta, consulte [Cómo iniciar sesión en](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) la AWS Guía del *AWS Sign-In usuario*.

Para obtener más información sobre IAM; y las instrucciones de configuración de la clave de seguridad, consulte [Creación de un usuario de IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Cumplir los requisitos de software del sistema
<a name="tutorial-stock-data-kplkcl-begin-sys"></a>

El sistema utilizado para ejecutar la aplicación debe tener instalado Java 7 o superior. Para descargar e instalar la versión más reciente del kit de desarrollo de Java (JDK), visite el [sitio web de instalación de Java SE de Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

Si tiene una aplicación IDE de Java, como [Eclipse](https://www.eclipse.org/downloads/), puede abrir el código fuente, editarlo, compilarlo y ejecutarlo.

Necesitará la última versión de [AWS SDK para Java](https://aws.amazon.com/sdk-for-java/). Si utiliza Eclipse como IDE, puede instalar el [Kit de herramientas de AWS para Eclipse](https://aws.amazon.com/eclipse/) en su lugar. 

La aplicación para consumidores requiere la versión 1.2.1 o superior de la biblioteca de clientes de Kinesis (KCL), que puede obtener en la biblioteca de clientes de GitHub [Kinesis](https://github.com/awslabs/amazon-kinesis-client) (Java).

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-begin-next"></a>

[Crear un flujo de datos](tutorial-stock-data-kplkcl-create-stream.md)

# Crear un flujo de datos
<a name="tutorial-stock-data-kplkcl-create-stream"></a>

En el primer paso del [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x[Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md), creará la secuencia que utilizará en los pasos siguientes.

**Para crear un flujo**

1. [Inicie sesión en la consola de Kinesis Consola de administración de AWS y ábrala en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Elija **Flujos de datos** en el panel de navegación.

1. En la barra de navegación, expanda el selector de regiones y seleccione una región.

1. Elija **Create Kinesis stream (Crear secuencia de Kinesis)**.

1. Escriba un nombre para la secuencia (por ejemplo **StockTradeStream**).

1. Introduzca **1** como número de particiones, pero deje **Calcular el número de particiones que va a necesitar** contraído.

1. Elija **Create Kinesis stream (Crear secuencia de Kinesis)**.

En la página de lista **Flujos de Kinesis**, el estado del flujo es `CREATING` mientras se está creando. Cuando la secuencia está lista para usarse, el estado cambia a `ACTIVE`. Elija el nombre del flujo. En la página que aparece, la pestaña **Detalles** muestra un resumen de la configuración del flujo. La sección **Monitoreo** muestra información de supervisión del flujo.

## Información adicional sobre particiones
<a name="tutorial-stock-data-kplkcl-create-stream-info"></a>

Cuando comience a utilizar Kinesis Data Streams fuera de este tutorial, es posible que tenga que planificar el proceso de creación de flujos más detenidamente. Debe planificarlo en función de la demanda máxima esperada cuando aprovisione los fragmentos. Con este escenario como ejemplo, el tráfico del mercado bursátil de los Estados Unidos alcanza su máximo durante el día (hora del este), por lo que las estimaciones de la demanda deben realizarse a para ese momento del día. A continuación, podrá elegir el aprovisionamiento adecuado para la demanda máxima prevista o escalar su secuencia en respuesta a las fluctuaciones de la demanda. 

Un *fragmento* es una unidad de capacidad de rendimiento. En la página **Crear flujo de Kinesis**, expanda **Estimar el número de particiones que necesitará**. Introduzca el tamaño medio de los registros, el número máximo de registros escritos por segundo, y el número de aplicaciones consumidoras utilizando las directrices siguientes:

**Tamaño medio de registro**  
Estimación del tamaño medio calculado de los registros. Si no conoce este valor, utilice la estimación del tamaño máximo del registro para este valor.

**Número máximo de registros escritos**  
Tenga en cuenta el número de entidades que proporcionan los datos y el número aproximado de registros por segundo producidos por cada una de ellas. Por ejemplo, si está obteniendo operaciones bursátiles de 20 servidores y cada uno genera 250 operaciones por segundo, el número total de operaciones (registros) por segundo es de 5000. 

**Número de aplicaciones consumidoras**  
Número de aplicaciones que leen independientemente desde la secuencia para procesarla de una forma diferente y producir resultados distintos. Cada aplicación puede tener varias instancias en ejecución en diferentes máquinas (es decir, que se ejecutan en un clúster) para poder procesar una secuencia de gran volumen.

Si el número estimado de fragmentos que se muestra supera su límite de fragmentos actual, puede que tenga que enviar una solicitud para aumentar ese límite antes de crear una secuencia con ese número de fragmentos. Para solicitar un aumento del límite de particiones, utilice el [formulario Límites de Kinesis Data Streams](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis). Para más información sobre los flujos y particiones, consulte [Crear y administrar Kinesis Data Streams](working-with-streams.md).

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-create-stream-next"></a>

[Crear un usuario y una política de IAM](tutorial-stock-data-kplkcl-iam.md)

# Crear un usuario y una política de IAM
<a name="tutorial-stock-data-kplkcl-iam"></a>

Las mejores prácticas de seguridad para AWS dictar el uso de permisos detallados para controlar el acceso a los diferentes recursos. AWS Identity and Access Management (IAM) le permite administrar los usuarios y los permisos de los usuarios en ellos. AWS Una [política de IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) enumera de forma explícita las acciones que se pueden realizar y los recursos a los que se pueden aplicar dichas acciones.

Los siguientes permisos son los permisos mínimos normalmente necesarios para un productor y un consumidor de Kinesis Data Streams.


**Productor**  

| Acciones | Recurso | Finalidad | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Flujo de datos de Kinesis | Antes de intentar escribir registros, el productor comprueba si la secuencia existe y está activa, si los fragmentos se encuentran en la secuencia y si la secuencia tiene un consumidor. | 
| SubscribeToShard, RegisterStreamConsumer | Flujo de datos de Kinesis | Suscribe y registra los consumidores en un fragmento de Kinesis Data Stream. | 
| PutRecord, PutRecords | Flujo de datos de Kinesis | Escriba registros en Kinesis Data Streams. | 


**Consumidor**  

| **Acciones** | **Resource** | **Finalidad** | 
| --- | --- | --- | 
| DescribeStream | Flujo de datos de Kinesis | Antes de intentar leer registros, el consumidor comprueba si existe la secuencia y si está activa, y si los fragmentos se encuentran en la secuencia. | 
| GetRecords, GetShardIterator  | Flujo de datos de Kinesis | Lee registros de una partición de Kinesis Data Streams. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabla de Amazon DynamoDB | Si el consumidor se desarrolla utilizando Kinesis Client Library (KCL), necesita permisos para que una tabla de DynamoDB realice un seguimiento del estado de procesamiento de la aplicación. El primer consumidor que se inicia crea la tabla.  | 
| DeleteItem | Tabla de Amazon DynamoDB | Para cuando el consumidor realiza split/merge operaciones en fragmentos de Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro de Amazon | El KCL también carga métricas CloudWatch, que son útiles para monitorear la aplicación. | 

Para esta aplicación, debe crear una única política de IAM; que conceda todos los permisos anteriores. En la práctica, es posible que quiera crear dos políticas, uno para los productores y otra para los consumidores.

**Para crear una política de IAM**

1. Localice el nombre de recurso de Amazon (ARN) de la nueva secuencia. Puede encontrar este ARN como **Stream ARN (ARN de la secuencia)** en la parte superior de la pestaña **Details (Detalles)**. El formato del ARN es el siguiente:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
El código de la región; por ejemplo, `us-west-2`. Para obtener más información, consulte [Conceptos de región y zona de disponibilidad](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*inscrita*  
El ID de la AWS cuenta, tal y como se muestra en la configuración de la [cuenta](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
El nombre de la secuencia de [Crear un flujo de datos](tutorial-stock-data-kplkcl-create-stream.md), que es `StockTradeStream`.

1. Determinar el ARN de la tabla de DynamoDB para que lo utilice el consumidor (creado por la primera instancia del consumidor). Debe tener el siguiente formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   La región y la cuenta y provienen del mismo lugar que en el paso anterior, pero esta vez *name* es el nombre de la tabla creada y utilizada por la aplicación consumidora. La KCL utilizada por el consumidor utiliza el nombre de la aplicación como nombre de la tabla. Utilice `StockTradesProcessor`, que será el nombre de aplicación que se utilizará más tarde.

1. En la consola de IAM, en **Políticas** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), selecciona **Crear política**. Si es la primera vez que trabaja con políticas de IAM, elija **Introducción**, **Crear política**.

1. Elija **Seleccionar** junto a **Generador de políticas**.

1. Elija **Amazon Kinesis** como servicio. AWS 

1. Seleccione `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` y `PutRecords` como acciones permitidas.

1. Introduzca el ARN que ha creado en el paso 1.

1. Utilice **Add Statement (Añadir instrucción)** para cada uno de los siguientes:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   El asterisco (`*`) que se utiliza cuando no es necesario especificar un ARN. En este caso, se debe a que no hay un recurso específico CloudWatch en el que se invoque la `PutMetricData` acción.

1. Elija **Paso siguiente**.

1. Cambie **Policy Name (Nombre de política)** a `StockTradeStreamPolicy`, revise el código y elija **Create Policy (Crear política)**.

El documento de política resultante debería tener un aspecto similar a este:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Para crear un usuario de IAM**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En la página **Users (Usuarios)**, elija **Add user (Añadir usuario)**.

1. En **Nombre de usuario**, escriba `StockTradeStreamUser`.

1. En **Access type (Tipo de acceso)**, elija **Programmatic access (Acceso por programa)** y, a continuación, elija **Next: Permissions (Siguiente: Permisos)**.

1. Elija **Asociar políticas existentes directamente**.

1. Busque por nombre la política que ha creado. Seleccione la casilla situada a la izquierda del nombre de la política y, a continuación, elija **Next: Review (Siguiente: Revisión)**.

1. Revise los detalles y el resumen y, a continuación, elija **Create user (Crear usuario)**.

1. Copie el valor de **Access key ID (ID de clave de acceso)** y guárdelo de modo confidencial. En **Secret access key (Clave de acceso secreta)**, elija **Show (Mostrar)** y guarde también esa clave de forma confidencial.

1. Pegue las claves de acceso y secreta en un archivo local en una ubicación segura a la que solo pueda obtener acceso usted. Para esta aplicación, cree un archivo denominado ` ~/.aws/credentials` (con permisos estrictos). El archivo debe tener el siguiente formato:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Asociar una política de IAM a un usuario**

1. En la consola de IAM, abra [Políticas](https://console.aws.amazon.com/iam/home?#policies) y elija **Acciones de política**. 

1. Elija `StockTradeStreamPolicy` y **Asociar**.

1. Elija `StockTradeStreamUser` y **Asociar política**.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-iam-next"></a>

[Descargar y compilar el código de implementación](tutorial-stock-data-kplkcl-download.md)

# Descargar y compilar el código de implementación
<a name="tutorial-stock-data-kplkcl-download"></a>

Se facilita la estructura del código para el [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md). Contiene una implementación de stub tanto para la adquisición de secuencias de operaciones bursátiles (*productor*) como para el procesamiento de los datos (*consumidor*). El siguiente procedimiento muestra cómo completar la implementación. 

**Para descargar y compilar el código de implementación**

1. Descargue el [código fuente](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1) en el equipo.

1. Cree un proyecto en su IDE favorito con el código fuente, utilizando la estructura de directorios proporcionada.

1. Añada las siguientes bibliotecas al proyecto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Según el IDE, el proyecto puede compilarse automáticamente. Si no, compile el proyecto con los pasos apropiados para su IDE.

Si completa estos pasos correctamente, estará preparado para pasar a la siguiente sección, [Implementar el productor](tutorial-stock-data-kplkcl-producer.md). Si la compilación genera errores en cualquier etapa, deberá investigarlos y corregirlos antes de continuar.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-download-next"></a>

[[Implementar el productor](tutorial-stock-data-kplkcl-producer.md)Implementar el productor](tutorial-stock-data-kplkcl-producer.md)

# Implementar el productor
<a name="tutorial-stock-data-kplkcl-producer"></a>



La aplicación del [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x[Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) utiliza un escenario real de monitorización del mercado bursátil. Los siguientes principios explicar brevemente la forma en que este escenario se asocia con la estructura del productor y el código de apoyo.

Consulte el código fuente y revise la siguiente información.

**StockTrade clase**  
Una operación bursátil está representada por una instancia de la clase `StockTrade`. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted.

**Registro de la secuencia**  
Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de `StockTrade` en formato JSON. Por ejemplo:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator clase**  
`StockTradeGenerator` tiene un método denominado `getRandomTrade()` que proporciona una nueva operación bursátil generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted.

**StockTradesWriter clase**  
El método `main` del productor, `StockTradesWriter` recupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:  

1. Lee los nombres de la secuencia y la región como entrada.

1. Crea un `AmazonKinesisClientBuilder`.

1. Utiliza el compilador de clientes para establecer la región, las credenciales y la configuración de cliente.

1. Crea un cliente de `AmazonKinesis` mediante el compilador de clientes.

1. Comprueba que la secuencia existe y está activa (si no, sale con un error).

1. En un bucle continuo, llama al método `StockTradeGenerator.getRandomTrade()` y después al método `sendStockTrade` para enviar la transacción a la secuencia cada 100 milisegundos.
El método `sendStockTrade` de la clase `StockTradesWriter` tiene el siguiente código:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Consulte el siguiente desglose del código:
+ La API de `PutRecord` espera una matriz de bytes, y debe convertir el `trade` a formato JSON. Esta única línea de código realiza esa operación:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Antes de poder enviar la transacción, debe crear una nueva instancia de `PutRecordRequest` (denominada `putRecord` en este caso):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Cada llamada a `PutRecord` requiere el nombre de la secuencia, la clave de partición y el blob de datos. El siguiente código rellenará estos campos en el objeto `putRecord` utilizando sus métodos `setXxxx()`:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  El ejemplo utiliza un ticker como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte [Agregar datos a un flujo](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  Ahora `putRecord` estará listo para el envío al cliente (la operación `put`):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Añada el try/catch bloque alrededor de la `put` operación:

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  Esto se debe a que una operación `put` de Kinesis Data Streams puede fallar debido a un error de red o debido a que el flujo alcanza sus límites de rendimiento y se ve limitado. Le recomendamos comprobar detalladamente su política de reintentos para las operaciones de `put` de modo que evite pérdida de datos, por ejemplo, al utilizar un reintento. 
+ El registro de estado resulta útil, pero es opcional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams, `PutRecord`. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros de `PutRecords` y enviar lotes de registros de una vez. Para obtener más información, consulte [Agregar datos a un flujo](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**Para ejecutar el productor**

1. Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo `~/.aws/credentials`. 

1. Ejecute la clase `StockTradeWriter` con los siguientes argumentos:

   ```
   StockTradeStream us-west-2
   ```

   Si ha creado su secuencia en una región diferente a `us-west-2` tendrá que especificar esa región aquí.

Debería ver una salida similar a esta:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

El flujo de operaciones bursátiles está siendo adquirido por Kinesis Data Streams.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[Implementar el consumidor](tutorial-stock-data-kplkcl-consumer.md)

# Implementar el consumidor
<a name="tutorial-stock-data-kplkcl-consumer"></a>

La aplicación consumidora del [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x[Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) procesa de forma continua la secuencia de operaciones bursátiles que se ha creado en [[Implementar el productor](tutorial-stock-data-kplkcl-producer.md)Implementar el productor](tutorial-stock-data-kplkcl-producer.md). A continuación, genera como resultado las acciones que más se compran y venden cada minuto. La aplicación se basa en la biblioteca Kinesis Client Library (KCL), que se ocupa de gran parte de las tareas que normalmente deben afrontar las aplicaciones consumidoras. Para obtener más información, consulte [Desarrollar consumidores de KCL 1.x](developing-consumers-with-kcl.md). 

Consulte el código fuente y revise la siguiente información.

**StockTradesProcessor clase**  
Clase principal del consumidor, que proporcionamos por usted y realiza las siguientes tareas:  
+ Lee los nombres de aplicación, secuencia y región que se pasan como argumentos.
+ Lee las credenciales de `~/.aws/credentials`.
+ Crea una instancia de `RecordProcessorFactory` que sirve instancias de `RecordProcessor`, implementadas por una instancia de `StockTradeRecordProcessor`.
+ Crea un proceso de trabajo de KCL con la instancia `RecordProcessorFactory` y una configuración estándar que incluye el nombre del flujo, las credenciales y el nombre de la aplicación. 
+ El proceso de trabajo crea un subproceso nuevo para cada partición (asignado a esta instancia del consumidor), que se ejecuta en bucle continuamente para leer registros de Kinesis Data Streams. A continuación, invoca a la instancia de `RecordProcessor` para procesar cada lote de registros recibido.

**StockTradeRecordProcessor clase**  
Implementación de la instancia `RecordProcessor`, que a su vez implementa tres métodos necesarios: `initialize`, `processRecords` y `shutdown`.  
Como indican sus nombres, `initialize` y `shutdown` se utilizan en Kinesis Client Library para que el procesador de registros sepa cuándo debe estar listo para empezar a recibir registros y cuándo debe esperar dejar de recibirlos, respectivamente, de modo que pueda realizar cualquier tarea de configuración y finalización específica de la aplicación. Le proporcionamos el código de los mismos. El procesamiento principal sucede en el método `processRecords`, que a su vez utiliza `processRecord` para cada registro. Este último método se proporciona principalmente como código estructural prácticamente vacío, para que pueda implementarlo en el siguiente paso, donde se explica más a fondo.  
También hay que resaltar la implementación de métodos de apoyo para `processRecord`: `reportStats`y `resetStats`, que están vacíos en el código fuente original.  
El método `processRecords` se implementa por usted, y realiza los pasos siguientes:  
+  Para cada registro que se pase, llama a `processRecord`.
+ Si ha pasado al menos 1 minuto desde el último informe, llama a `reportStats()`, que imprime las últimas estadísticas y, a continuación, a `resetStats()`, que elimina las estadísticas para que el próximo intervalo incluya solo registros nuevos.
+ Establece el momento del siguiente informe.
+ Si ha transcurrido al menos un minuto desde el último punto de comprobación de la base de datos, llama a `checkpoint()`. 
+ Establece el momento del siguiente punto de comprobación.
Este método utiliza intervalos de 60 segundos para la velocidad de elaboración de informes y puntos de comprobación. Para obtener más información sobre los puntos de control, consulte [Información adicional sobre el consumidor](#tutorial-stock-data-kplkcl-consumer-supplement).

**StockStats clase**  
Esta clase proporciona retención de datos y seguimiento de estadísticas para las acciones más populares a lo largo del tiempo. Proporcionamos este código por usted y contiene los siguientes métodos:  
+ `addStockTrade(StockTrade)`: inserta el `StockTrade` dado en las estadísticas de ejecución.
+ `toString()`: devuelve las estadísticas en una cadena con formato.
Esta clase hace un seguimiento de los valores más populares, al realizar un recuento continuo del número total de transacciones para cada valor y del recuento máximo. Asimismo, actualiza estos recuentos cada vez que llega una operación nueva.

Agregar código para los métodos de la clase `StockTradeRecordProcessor`, tal y como se muestra en los pasos siguientes.

**Para implementar el consumidor**

1. Implemente el método `processRecord` creando una instancia de un objeto `StockTrade` con el tamaño correcto y añadiéndole los datos de registro, registrando una advertencia si hay algún problema.

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. Implemente un método `reportStats` sencillo. Si lo desea, puede modificar el formato de salida según sus preferencias.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. Por último, implemente el método `resetStats`, lo que creará una nueva instancia de `stockStats`.

   ```
   stockStats = new StockStats();
   ```

**Para ejecutar el consumidor**

1. Ejecute el productor que escribió en [[Implementar el productor](tutorial-stock-data-kplkcl-producer.md)Implementar el productor](tutorial-stock-data-kplkcl-producer.md) para insertar registros de operaciones bursátiles simuladas en la secuencia.

1. Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo `~/.aws/credentials`. 

1. Ejecute la clase `StockTradesProcessor` con los siguientes argumentos:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Tenga en cuenta que si ha creado su secuencia en una región diferente a `us-west-2` tiene que especificar esa región aquí.

Después de un minuto, debería ver un resultado similar a este, actualizado cada minuto:

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Información adicional sobre el consumidor
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

Si está familiarizado con las ventajas de Kinesis Client Library, que se tratan en [Desarrollar consumidores de KCL 1.x](developing-consumers-with-kcl.md) y en otros artículos, es posible que se pregunte por qué debería utilizarla aquí. Aunque solo se utiliza un único flujo de particiones y una única instancia de consumidor para procesarlo, aún así es más fácil implementar el consumidor mediante KCL. Compare los pasos de implementación del código en la sección del productor con la del consumidor, y podrá ver en comparación la facilidad de implementar un consumidor. Esto se debe, en gran medida, a los servicios que proporciona la KCL.

En esta aplicación, se centrará en la implementación de una clase de procesador de registros que puede procesar registros individuales. No tiene que preocuparse de la forma en que se obtienen los registros de Kinesis Data Streams. KCL recibe los registros e invoca el procesador de registros cuando hay nuevos registros disponibles. Tampoco tiene que preocuparse del número de fragmentos o instancias del consumidor que hay. Si la secuencia se amplía, no será necesario volver a escribir su aplicación para administrar más de un fragmento o una instancia del consumidor.

El término *puntos de control* significa registrar el punto del flujo hasta los registros de datos que se han consumido y procesado hasta el momento. Si la aplicación se bloquea, el flujo se lee desde ese punto y no desde el principio. El tema de los puntos de comprobación, los diversos patrones de diseño y las prácticas recomendadas al respecto quedan fuera del ámbito de este capítulo. Sin embargo, es algo que puede encontrar en entornos de producción.

Como aprendió en [[Implementar el productor](tutorial-stock-data-kplkcl-producer.md)Implementar el productor](tutorial-stock-data-kplkcl-producer.md), las operaciones `put` de la API de Kinesis Data Streams toman una *clave de partición* como entrada. Kinesis Data Streams utiliza una clave de partición como mecanismo para dividir los registros en varias particiones (cuando hay más de una partición en el flujo). La misma clave de partición siempre se dirige al mismo fragmento. Esto permite que el consumidor que procesa un determinado fragmento se diseñe con el supuesto de que los registros con la misma clave de partición solo se envían a ese consumidor, y que ningún registro con la misma clave de partición acaba en ningún otro consumidor. Por lo tanto, un proceso de trabajo del consumidor puede agregar todos los registros con la misma clave de partición sin preocuparse de que podrían faltar datos necesarios.

En esta aplicación, el procesamiento de registros por parte del consumidor no es intensivo, por lo que puede utilizar una partición y realizar el procesamiento en el mismo subproceso que el subproceso de KCL. En la práctica, sin embargo, piense primero en aumentar el número de fragmentos. En algunos casos, es posible que desee cambiar el procesamiento a otro subproceso o utilizar un grupo de subprocesos si espera que el procesamiento de registros sea intenso. De esta forma, KCL puede obtener nuevos registros con mayor rapidez, mientras que los demás subprocesos pueden procesar los registros en paralelo. El diseño con múltiples subprocesos no es trivial y debe abordarse con técnicas avanzadas, por lo que aumentar el número de particiones suele ser la forma más eficaz y sencilla de escalar verticalmente.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[(Opcional) Ampliar el consumidor](tutorial-stock-data-kplkcl-consumer-extension.md)

# (Opcional) Ampliar el consumidor
<a name="tutorial-stock-data-kplkcl-consumer-extension"></a>

Es posible que la aplicación del [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x[Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) ya sea suficiente para sus objetivos. En esta sección opcional, mostramos cómo puede ampliar el código del consumidor para un escenario ligeramente más complicado.

Si desea conocer cuáles son las mayores operaciones de venta de cada minuto, puede modificar la clase `StockStats` en tres lugares para dar cabida a esta nueva prioridad.

**Para ampliar el consumidor**

1. Agregue nuevas variables de instancia:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Añada el siguiente código a `addStockTrade`:

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modifique el método `toString` para imprimir la información adicional:

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

Si ejecuta el consumidor ahora (recuerde ejecutar también el productor), debería ver un resultado similar a este:

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-consumer-extension-next"></a>

[Eliminar recursos](tutorial-stock-data-kplkcl-finish.md)

# Eliminar recursos
<a name="tutorial-stock-data-kplkcl-finish"></a>

Como paga por utilizar el flujo de datos de Kinesis, asegúrese de eliminarla, así como la tabla de Amazon DynamoDB correspondiente, cuando haya terminado con ella. Se aplican cargos nominales sobre una secuencia activa incluso aunque no esté enviando ni recibiendo registros. Esto se debe a que una secuencia activa está utilizando los recursos "escuchando" de forma continua los registros entrantes y las solicitudes para obtener registros.

**Para eliminar la secuencia y la tabla**

1. Cierre los productores y los consumidores que puedan estar ejecutándose aún.

1. [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Seleccione la secuencia que haya creado para esta aplicación (`StockTradeStream`).

1. Elija **Delete Stream (Eliminar secuencia)**.

1. Abra la consola de DynamoDB en. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Elimine la tabla `StockTradesProcessor`.

## Resumen
<a name="tutorial-stock-data-kplkcl-summary"></a>

El procesamiento de una gran cantidad de datos casi en tiempo real no requiere escribir ningún código complicado ni desarrollar una infraestructura enorme. Es tan básico como escribir lógica para procesar una pequeña cantidad de datos (como escribir `processRecord(Record)`), pero con Kinesis Data Streams para escalarla de forma que funcione con una gran cantidad de datos transmitidos. No tiene que preocuparse de cómo escalar su procesamiento, ya que Kinesis Data Streams lo administra por usted. Lo único que tiene que hacer es enviar sus registros de streaming a Kinesis Data Streams y escribir la lógica para procesar cada nuevo registro recibido. 

A continuación se muestran algunas posibles mejoras para esta aplicación.

**Agregación en todos los fragmentos**  
En la actualidad, obtiene estadísticas derivadas de agrupar los registros de datos que se reciben de un único proceso de trabajo desde un único fragmento. (Un fragmento no puede ser procesado por más de un proceso de trabajo en una sola aplicación al mismo tiempo). Lógicamente, si escala y tiene más de un fragmento, es posible que quiera realizar la agregación en todos los fragmentos. Podrá hacerlo con una arquitectura de canalización en la que el resultado de cada proceso de trabajo se envíe a otra secuencia con un único fragmento, que se procesará por parte de un proceso de trabajo que agregue los resultados de la primera etapa. Dado que los datos de la primera etapa están limitados (a una muestra por minuto y fragmento), pueden ser administrados fácilmente por un fragmento.

**Procesamiento de la escala**  
Cuando la secuencia se escala para tener muchos fragmentos (porque hay muchos productores enviando datos), el método para aumentar el procesamiento es agregar más procesos de trabajo. Puede ejecutar los procesos de trabajo en instancias de Amazon EC2 y utilizar grupos de escalado automático.

**Usa conectores para Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Como una transmisión se procesa de forma continua, su salida se puede enviar a otros destinos. AWS proporciona [conectores](https://github.com/awslabs/amazon-kinesis-connectors) para integrar Kinesis Data Streams con AWS otros servicios y herramientas de terceros.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-next-steps"></a>
+ Para más información acerca del uso de la API de Kinesis Data Streams, consulte [Desarrolle productores mediante la API Amazon Kinesis Data Streams con AWS SDK para Java](developing-producers-with-sdk.md), [Desarrolle consumidores con un rendimiento compartido con AWS SDK para Java](developing-consumers-with-sdk.md) y [Crear y administrar Kinesis Data Streams](working-with-streams.md).
+ Para más información sobre Kinesis Client Library, consulte [Desarrollar consumidores de KCL 1.x](developing-consumers-with-kcl.md). 
+ Para obtener más información sobre cómo optimizar su aplicación, consulte [Optimizar consumidores de Amazon Kinesis Data Streams.Optimizar consumidores de Kinesis Data Streams.](advanced-consumers.md). 

# Tutorial: Analizar datos bursátiles en tiempo real con Amazon Managed Service para Apache Flink
<a name="tutorial-stock-data"></a>

El escenario que planteamos en este tutorial comprende la adquisición de operaciones bursátiles en un flujo de datos y la escritura de una aplicación de [Amazon Managed Service para Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html) sencilla que realiza cálculos con dicho flujo. Aprenderá a enviar un flujo de registros a Kinesis Data Streams y a implementar una aplicación que consume y procesa dichos registros casi en tiempo real.

Con Amazon Managed Service para Apache Flink, puede usar Java o Scala para procesar y analizar datos de streaming. El servicio le permite crear y ejecutar código Java o Scala en orígenes de flujo para realizar análisis de series temporales, alimentar paneles en tiempo real y crear métricas en tiempo real.

Puede crear aplicaciones Flink en Managed Service para Apache Flink mediante bibliotecas de código abierto basadas en [Apache Flink.](https://flink.apache.org/) Apache Flink es un marco y un motor usados habitualmente para procesar flujos de datos. 

**importante**  
Después de crear dos transmisiones de datos y una aplicación, su cuenta incurre en cargos nominales por el uso de Kinesis Data Streams y Managed Service for Apache Flink, ya que no son aptas para AWS la capa gratuita. Cuando termine con esta aplicación, elimine sus AWS recursos para dejar de incurrir en cargos. 

El código no obtiene acceso a datos bursátiles reales, sino que, en su lugar, simula la secuencia de operaciones bursátiles. Lo hace a través de un generador de operaciones bursátiles aleatorias. Si tiene acceso a una secuencia de operaciones bursátiles en tiempo real, puede que le interese derivar estadísticas útiles y puntuales a partir de dicha secuencia. Por ejemplo, es posible que desee realizar un análisis de ventana deslizante en el que se determina el valor más popular adquirido durante los últimos 5 minutos. O también cabe la posibilidad de que quiera recibir una notificación cada vez que haya una orden de venta que sea demasiado grande (es decir, con demasiadas acciones). Puede ampliar el código de esta serie para proporcionar esta funcionalidad.

Los ejemplos mostrados utilizan la región Oeste de EE. UU. (Oregón), pero funcionan en cualquiera de las [regiones de AWS compatibles con Managed Service para Apache Flink](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region).

**Topics**
+ [

## Requisitos previos para realizar los ejercicios
](#setting-up-prerequisites)
+ [

# Configure una AWS cuenta y cree un usuario administrador
](setting-up.md)
+ [

# Configure el AWS Command Line Interface ()AWS CLI
](setup-awscli.md)
+ [

# Crear y ejecutar una aplicación de Managed Service para Apache Flink
](get-started-exercise.md)

## Requisitos previos para realizar los ejercicios
<a name="setting-up-prerequisites"></a>

Para completar los pasos de esta guía, debe disponer de lo siguiente:
+ [Java Development Kit](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) (JDK), versión 8. Establezca la variable de entorno `JAVA_HOME` para señalar la ubicación de la instalación del JDK.
+ Se recomienda utilizar un entorno de desarrollo (como [Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) o [IntelliJ Idea](https://www.jetbrains.com/idea/)) para desarrollar y compilar su aplicación.
+ [Cliente Git.](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Si aún no lo ha hecho, instale el cliente Git.
+ [Apache Maven Compiler Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/). Maven debe estar en su ruta de trabajo. Para probar la instalación de Apache Maven, introduzca lo siguiente:

  ```
  $ mvn -version
  ```

Para empezar, vaya a [Configure una AWS cuenta y cree un usuario administrador](setting-up.md).

# Configure una AWS cuenta y cree un usuario administrador
<a name="setting-up"></a>

Antes de utilizar Amazon Managed Service para Apache Flink por primera vez, realice las siguientes tareas:

1. [Inscríbase en AWS](#setting-up-signup)

1. [Creación de un usuario de IAM](#setting-up-iam)

## Inscríbase en AWS
<a name="setting-up-signup"></a>

Cuando te registras en Amazon Web Services (AWS), tu AWS cuenta se registra automáticamente en todos los servicios de Amazon AWS, incluido Amazon Managed Service para Apache Flink. Solo se le cobrará por los servicios que utilice.

Con Managed Service para Apache Flink, solo paga por los recursos que utiliza. Si es un nuevo cliente de AWS , puede comenzar a utilizar Managed Service para Apache Flink de forma gratuita. Para obtener más información, consulte [Capa gratuita de AWS](https://aws.amazon.com/free/).

Si ya tienes una AWS cuenta, pasa a la siguiente tarea. Si no dispone de una cuenta de AWS , siga estos pasos para crear una.

**Para crear una AWS cuenta**

1. Abre el [https://portal.aws.amazon.com/billing/registro](https://portal.aws.amazon.com/billing/signup).

1. Siga las instrucciones que se le indiquen.

   Parte del procedimiento de registro consiste en recibir una llamada telefónica o mensaje de texto e indicar un código de verificación en el teclado del teléfono.

   Cuando te registras en un Cuenta de AWS, *Usuario raíz de la cuenta de AWS*se crea un. El usuario raíz tendrá acceso a todos los Servicios de AWS y recursos de esa cuenta. Como práctica recomendada de seguridad, asigne acceso administrativo a un usuario y utilice únicamente el usuario raíz para realizar [Tareas que requieren acceso de usuario raíz](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks).

Anota el ID de tu AWS cuenta porque lo necesitarás para la siguiente tarea.

## Creación de un usuario de IAM
<a name="setting-up-iam"></a>

Los servicios de AWS, como Amazon Managed Service para Apache Flink, requieren que proporciones credenciales al acceder a ellos. Esto es así para que el servicio puede determinar si usted tiene permisos para obtener acceso a los recursos que son propiedad de tal servicio. Consola de administración de AWS Requiere que introduzcas tu contraseña. 

Puede crear claves de acceso para que su AWS cuenta acceda a AWS Command Line Interface (AWS CLI) o a la API. Sin embargo, no te recomendamos que accedas AWS con las credenciales de tu AWS cuenta. En su lugar, te recomendamos que utilices AWS Identity and Access Management (IAM). Cree un usuario de IAM, añada el usuario a un grupo de IAM con permisos administrativos y, a continuación, conceda permisos administrativos al usuario de IAM que ha creado. Luego, podrá acceder a AWS mediante una URL especial y esas credenciales de usuario de IAM.

Si te has registrado AWS, pero no has creado un usuario de IAM para ti, puedes crear uno mediante la consola de IAM.

En los ejercicios de introducción de esta guía se supone que tiene un usuario con permisos de administrador (`adminuser`). Siga el procedimiento para crear `adminuser` en su cuenta.

**Para crear un grupo de administradores**

1. Inicie sesión en la consola de IAM Consola de administración de AWS y ábrala en. [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)

1. En el panel de navegación, elija **Grupos** y, a continuación, elija **Crear nuevo grupo**.

1. En **Nombre de grupo**, escriba el nombre del grupo, como **Administrators**, y seleccione **Paso siguiente**.

1. En la lista de políticas, seleccione la casilla de verificación situada junto a la **AdministratorAccess**política. Puede utilizar el menú de **filtros** y el cuadro de **búsqueda** para filtrar la lista de políticas.

1. Elija **Siguiente paso** y luego seleccione **Crear grupo**.

El grupo nuevo figura en la lista **Nombre del grupo**.

**Para crear un usuario de IAM para usted, agréguelo al grupo Administradores y cree una contraseña**

1. En el panel de navegación, elija **Users** y luego elija la opción **Add user**.

1. En el cuadro **Nombre de usuario**, escriba un nombre de usuario.

1. Seleccione **Acceso mediante programación** y **Acceso con la consola de administración de AWS **.

1. Elija **Siguiente: permisos**.

1. Seleccione la casilla de verificación situada junto al grupo de **Administradores**. A continuación, seleccione **Next: Review**.

1. Seleccione la opción **Crear un usuario**.

**Para iniciar sesión como el nuevo usuario de IAM**

1. Cierre sesión en Consola de administración de AWS.

1. Utilice el siguiente formato de URL para iniciar sesión en la consola:

   `https://aws_account_number.signin.aws.amazon.com/console/`

   *aws\$1account\$1number*Es su ID AWS de cuenta sin guiones. Por ejemplo, si tu ID de AWS cuenta es 1234-5678-9012, sustitúyelo por. *aws\$1account\$1number* **123456789012** *Para obtener información sobre cómo encontrar su número de cuenta, consulte El [identificador de su cuenta y su alias AWS en](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html) la Guía del usuario de IAM.*

1. Escriba el nombre y la contraseña del usuario de IAM que acaba de crear. Cuando haya iniciado sesión, la barra de navegación mostrará *your\$1user\$1name* @*your\$1aws\$1account\$1id*.

**nota**  
Si no quieres que la URL de la página de inicio de sesión contenga tu ID de AWS cuenta, puedes crear un alias de cuenta.

**Para crear o eliminar un alias de cuenta**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En el panel de navegación, elija **Panel**.

1. Busque el enlace de inicio de sesión de los usuarios de IAM.

1. Para crear el alias, elija **Personalizar**. Escriba el nombre que desea utilizar para el alias y, a continuación, elija **Sí, crear**.

1. Para eliminar el alias, elija **Personalizar** y, a continuación, elija **Sí, eliminar**. La URL de inicio de sesión volverá a utilizar tu AWS ID de cuenta.

Para iniciar sesión después de crear un alias de cuenta, use la siguiente dirección URL:

`https://your_account_alias.signin.aws.amazon.com/console/`

Para verificar el enlace de inicio de sesión de los usuarios de IAM de su cuenta, abra la consola de IAM y compruebe el valor de **IAM users sign-in link (Enlace de inicio de sesión de los usuarios de IAM)** en el panel.

Para obtener más información sobre IAM, consulte lo siguiente:
+ [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/)
+ [Cómo empezar con IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [Guía del usuario de IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## Siguiente paso
<a name="setting-up-next-step-2"></a>

[Configure el AWS Command Line Interface ()AWS CLI](setup-awscli.md)

# Configure el AWS Command Line Interface ()AWS CLI
<a name="setup-awscli"></a>

En este paso, debe descargar y configurar el AWS CLI para usarlo con Amazon Managed Service para Apache Flink.

**nota**  
En los ejercicios introductorios de esta guía se presupone que está utilizando las credenciales de administrador (`adminuser`) en su cuenta para realizar las operaciones.

**nota**  
Si ya lo tiene AWS CLI instalado, es posible que necesite actualizarlo para obtener la funcionalidad más reciente. Para obtener más información, consulte [Instalación de la interfaz de línea de AWS comandos](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) en la *Guía del AWS Command Line Interface usuario*. Para comprobar la versión de AWS CLI, ejecute el siguiente comando:  

```
aws --version
```
Los ejercicios de este tutorial requieren la siguiente AWS CLI versión o una posterior:  

```
aws-cli/1.16.63
```

**Para configurar el AWS CLI**

1. Descargue y configure la AWS CLI. Para obtener instrucciones, consulte los siguientes temas en la *Guía del usuario de la AWS Command Line Interface *: 
   + [Instalación de la AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [Configuración de la AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. Agregue un perfil con nombre para el usuario administrador en el archivo de AWS CLI configuración. Este perfil se utiliza al ejecutar los AWS CLI comandos. Para obtener más información sobre los perfiles con nombre, consulte [Perfiles con nombre](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html) en la *Guía del usuario de la AWS Command Line Interface *.

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   Para obtener una lista de AWS las regiones disponibles, consulte [AWS Regiones y puntos finales](https://docs.aws.amazon.com/general/latest/gr/rande.html) en la *Referencia general de Amazon Web Services*.

1. Verifique la configuración introduciendo el siguiente comando de ayuda en el símbolo del sistema: 

   ```
   aws help
   ```

Después de configurar una AWS cuenta y el AWS CLI, puede probar el siguiente ejercicio, en el que configurará una aplicación de ejemplo y probará la end-to-end configuración.

## Siguiente paso
<a name="setting-up-next-step-3"></a>

[Crear y ejecutar una aplicación de Managed Service para Apache Flink](get-started-exercise.md)

# Crear y ejecutar una aplicación de Managed Service para Apache Flink
<a name="get-started-exercise"></a>

En este ejercicio, deberá crear una aplicación de Managed Service para Apache Flink con flujos de datos como origen y receptor.

**Topics**
+ [

## Crear dos Amazon Kinesis Data Streams
](#get-started-exercise-1)
+ [

## Escritura de registros de muestra en el flujo de entrada
](#get-started-exercise-2)
+ [

## Descargar y consultar el código de Java de streaming de Apache Flink
](#get-started-exercise-5)
+ [

## Compilar el código de la aplicación
](#get-started-exercise-5.5)
+ [

## Cargar el código de Java de streaming de Apache Flink
](#get-started-exercise-6)
+ [

## Crear y ejecutar la aplicación de Managed Service para Apache Flink
](#get-started-exercise-7)

## Crear dos Amazon Kinesis Data Streams
<a name="get-started-exercise-1"></a>

Antes de crear una Amazon Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (`ExampleInputStream` y `ExampleOutputStream`). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones de la consola, consulte [Creación y actualización de secuencias de datos](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**Cómo crear flujos de datos (AWS CLI)**

1. Para crear la primera transmisión (`ExampleInputStream`), utilice el siguiente comando de Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Escritura de registros de muestra en el flujo de entrada
<a name="get-started-exercise-2"></a>

En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.

**nota**  
Esta sección requiere [AWS SDK para Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Cree un archivo denominado `stock.py` con el siguiente contenido:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Más adelante en el tutorial, se ejecuta el script `stock.py` para enviar datos a la aplicación. 

   ```
   $ python stock.py
   ```

## Descargar y consultar el código de Java de streaming de Apache Flink
<a name="get-started-exercise-5"></a>

El código de la aplicación Java para este ejemplo está disponible en. GitHub Para descargar el código de la aplicación, haga lo siguiente:

1. Clone el repositorio remoto con el siguiente comando:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Vaya al directorio `GettingStarted`.

El código de la aplicación se encuentra en los archivos `CustomSinkStreamingJob.java` y `CloudWatchLogSink.java`. Tenga en cuenta lo siguiente en relación con el código de la aplicación:
+ La aplicación utiliza un origen de Kinesis para leer del flujo de origen. El siguiente fragmento crea el receptor de Kinesis:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Compilar el código de la aplicación
<a name="get-started-exercise-5.5"></a>

En esta sección, se utiliza el compilador Apache Maven para crear el código de Java para la aplicación. Para obtener más información sobre la instalación de Apache Maven y el Java Development Kit (JDK), consulte [Requisitos previos para realizar los ejercicios](tutorial-stock-data.md#setting-up-prerequisites).

La aplicación de Java requiere los siguientes componentes:
+ Un archivo [Project Object Model (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html). Este archivo contiene información sobre la configuración y las dependencias de la aplicación, incluidas las bibliotecas de Amazon Managed Service para Apache Flink.
+ Un método `main` que contiene la lógica de la aplicación.

**nota**  
**Para utilizar el conector de Kinesis para la siguiente aplicación, es necesario descargar el código fuente del conector y compilarlo tal y como se describe en la [documentación de Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html).**

**Para crear y compilar el código de la aplicación**

1. Cree una Java/Maven aplicación en su entorno de desarrollo. Para obtener más información acerca de cómo crear una aplicación, consulte la documentación de su entorno de desarrollo:
   + [ Creating your first Java project (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ Creating, Running and Packaging Your First Java Application (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Utilice el siguiente código para un archivo llamado `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Tenga en cuenta lo siguiente en relación con el ejemplo de código anterior:
   + Este archivo contiene el método `main` que define la funcionalidad de la aplicación.
   + La aplicación crea conectores de origen y recepción para obtener acceso a recursos externos usando un objeto `StreamExecutionEnvironment`. 
   + La aplicación crea conectores de origen y recepción mediante propiedades estáticas. Para utilizar propiedades dinámicas de la aplicación, utilice los métodos `createSourceFromApplicationProperties` y `createSinkFromApplicationProperties` para crear los conectores. Estos métodos leen las propiedades de la aplicación para configurar los conectores.

1. Para utilizar el código de la aplicación, compile y empaquete el código en un archivo JAR. Se puede compilar y empaquetar el código de una de las dos formas siguientes:
   + Utilice la herramienta de línea de comandos de Maven. Cree su archivo JAR ejecutando el siguiente comando en el directorio que contiene el archivo `pom.xml`:

     ```
     mvn package
     ```
   + Use el entorno de desarrollo. Consulte la documentación de su entorno de desarrollo para obtener más información.

   Se puede cargar el paquete como un archivo JAR o puede comprimir el paquete y cargarlo como un archivo ZIP. Si crea su aplicación con AWS CLI, especifique el tipo de contenido del código (JAR o ZIP).

1. Si hay errores al compilar, verifique que la variable de entorno `JAVA_HOME` se ha configurado correctamente.

Si la aplicación se compila correctamente, se crea el siguiente archivo:

`target/java-getting-started-1.0.jar`

## Cargar el código de Java de streaming de Apache Flink
<a name="get-started-exercise-6"></a>

En esta sección, creará un bucket de Amazon Simple Storage Service (Amazon S3) y cargará el código de la aplicación.

**Cómo cargar el código de la aplicación**

1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Elija **Crear bucket**.

1. Escriba **ka-app-code-*<username>*** en el campo **Nombre del bucket**. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija **Siguiente**.

1. En el paso **Configurar opciones**, deje los ajustes tal y como están y elija **Siguiente**.

1. En el paso **Establecer permisos**, deje los ajustes tal y como están y elija **Siguiente**.

1. Elija **Crear bucket**.

1. En la consola de Amazon S3, elija el *<username>* bucket **ka-app-code-** y, a continuación, seleccione **Upload**.

1. En el paso **Seleccionar archivos**, elija **Añadir archivos**. Vaya al archivo `java-getting-started-1.0.jar` que creó en el paso anterior. Elija **Siguiente**.

1. En el paso **Establecer permisos**, deje los ajustes tal y como están. Elija **Siguiente**.

1. En el paso **Establecer propiedades**, deje los ajustes tal y como están. Seleccione **Cargar**.

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

## Crear y ejecutar la aplicación de Managed Service para Apache Flink
<a name="get-started-exercise-7"></a>

Se puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI.

**nota**  
Cuando crea la aplicación mediante la consola, sus recursos AWS Identity and Access Management (de IAM) y de Amazon CloudWatch Logs se crean automáticamente. Cuando crea la aplicación con AWS CLI, crea estos recursos por separado.

**Topics**
+ [

### Crear y ejecutar la aplicación (consola)
](#get-started-exercise-7-console)
+ [

### Crear y ejecutar la aplicación (AWS CLI)
](#get-started-exercise-7-cli)

### Crear y ejecutar la aplicación (consola)
<a name="get-started-exercise-7-console"></a>

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

#### Creación de la aplicación
<a name="get-started-exercise-7-console-create"></a>

1. [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. En el panel de Amazon Kinesis, elija **Crear aplicación de análisis**.

1. En la página **Kinesis Analytics - Crear aplicación**, proporcione la siguiente información:
   + En **Nombre de la aplicación**, escriba **MyApplication**.
   + En **Descripción**, escriba **My java test app**.
   + En **Tiempo de ejecución**, escriba **Apache Flink 1.6**.

1. Para **Permisos de acceso**, seleccione **Crear o actualizar rol de IAM `kinesis-analytics-MyApplication-us-west-2`**.

1. Elija **Crear aplicación**.

**nota**  
Al crear una aplicación de Amazon Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:  
Política: `kinesis-analytics-service-MyApplication-us-west-2`
Rol: `kinesis-analytics-MyApplication-us-west-2`

#### Modificar la política de IAM
<a name="get-started-exercise-7-console-iam"></a>

Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Elija **Políticas**. Elija la política **`kinesis-analytics-service-MyApplication-us-west-2`** que la consola creó en su nombre en la sección anterior. 

1. En la página **Resumen**, elija **Editar política**. Seleccione la pestaña **JSON**.

1. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya la cuenta de muestra IDs (*012345678901*) por su ID de cuenta.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Configurar la aplicación
<a name="get-started-exercise-7-console-configure"></a>

1. En la **MyApplication**página, selecciona **Configurar**.

1. En la página **Configurar aplicación**, proporcione la **Ubicación del código**:
   + Para el **bucket de Amazon S3**, introduzca **ka-app-code-*<username>***.
   + En **Ruta al objeto de Amazon S3**, introduzca **java-getting-started-1.0.jar**.

1. En **Acceso a los recursos de la aplicación**, en **Permisos de acceso**, seleccione **Crear o actualizar rol de IAM `kinesis-analytics-MyApplication-us-west-2`**.

1. En **Propiedades**, en **ID de grupo**, escriba **ProducerConfigProperties**.

1. Escriba las siguientes propiedades y valores de la aplicación:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/get-started-exercise.html)

1. En **Monitorización**, asegúrese de que el **Nivel de métricas de monitorización** se ha establecido en **Aplicación**.

1. Para el **CloudWatch registro**, active la casilla **Activar**.

1. Elija **Actualizar**.

**nota**  
Si decide habilitar el CloudWatch registro, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros automáticamente. Los nombres de estos recursos son los siguientes:   
Grupo de registro: `/aws/kinesis-analytics/MyApplication`
Flujo de registro: `kinesis-analytics-log-stream`

#### Ejecución de la aplicación
<a name="get-started-exercise-7-console-run"></a>

1. En la **MyApplication**página, seleccione **Ejecutar**. Confirme la acción.

1. Cuando la aplicación se está ejecutando, actualice la página. La consola muestra el **Gráfico de la aplicación**.

#### Detener la aplicación
<a name="get-started-exercise-7-console-stop"></a>

En la **MyApplication**página, selecciona **Detener**. Confirme la acción.

#### Actualizar la aplicación
<a name="get-started-exercise-7-console-update"></a>

Mediante la consola, puede actualizar la configuración de la aplicación, tal como sus propiedades, ajustes de monitorización y la ubicación o el nombre de archivo JAR de la aplicación. También puede volver a cargar el JAR de la aplicación del bucket de Amazon S3 si necesita actualizar el código de la aplicación.

En la **MyApplication**página, elija **Configurar**. Actualice la configuración de la aplicación y elija **Actualizar**.

### Crear y ejecutar la aplicación (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

En esta sección, se utiliza AWS CLI para crear y ejecutar la aplicación Managed Service for Apache Flink. Managed Service for Apache Flink usa el `kinesisanalyticsv2` AWS CLI comando para crear aplicaciones Managed Service for Apache Flink e interactuar con ellas.

#### Crear una política de permisos
<a name="get-started-exercise-7-cli-policy"></a>

En primer lugar, debe crear una política de permisos con dos instrucciones: una que concede permisos para la acción `read` en el flujo de origen y otra que concede permisos para las acciones `write` en el flujo de recepción. A continuación, asocie la política a un rol de IAM (que se crea en la siguiente sección). Por lo tanto, cuando Managed Service para Apache Flink asume el rol, el servicio tiene los permisos necesarios para leer desde el flujo de origen y escribir en el flujo de recepción.

Utilice el siguiente código para crear la política de permisos `KAReadSourceStreamWriteSinkStream`. Reemplace `username` por el nombre de usuario que se utilizó para crear el bucket de Amazon S3 para almacenar el código de la aplicación. Sustituya el ID de cuenta en Amazon Resource Names (ARNs) (`012345678901`) por su ID de cuenta.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

Para step-by-step obtener instrucciones sobre cómo crear una política de permisos, consulte el [tutorial: Cómo crear y adjuntar su primera política gestionada por el cliente](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) en la *Guía del usuario de IAM*.

**nota**  
Para acceder a otros AWS servicios, puede utilizar el AWS SDK para Java. Managed Service para Apache Flink establece automáticamente las credenciales requeridas por el SDK con las del rol de IAM de ejecución del servicio asociada a su aplicación. No hace falta realizar ningún otro paso.

#### Creación de un rol de IAM
<a name="get-started-exercise-7-cli-role"></a>

En esta sección, creará un rol de IAM que Managed Service para Apache Flink pueda asumir para leer un flujo de origen y escribir en el flujo de recepción.

Managed Service para Apache Flink no puede acceder a su flujo sin permisos. Estos permisos se conceden a través del rol de IAM. Cada rol de IAM tiene dos políticas asociadas. La política de confianza concede a Managed Service para Apache Flink permiso para asumir el rol, y la política de permisos determina lo que Managed Service para Apache Flink puede hacer después de asumir el rol.

Usted deberá asociar la política de permisos que ha creado en la sección anterior a este rol.

**Cómo crear un rol de IAM**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En el panel de navegación, elija **Roles**, **Crear rol**.

1. En **Seleccionar tipo de entidad de confianza**, elija **Servicio de AWS **. En **Elegir el servicio que usará este rol**, elija **Kinesis**. En **Seleccionar su caso de uso**, elija **Kinesis Analytics**.

   Elija **Siguiente: permisos**.

1. En la página **Asociar políticas de permisos**, elija **Siguiente: Revisión**. Asociará políticas de permisos después de crear el rol.

1. En la página **Crear rol**, escriba **KA-stream-rw-role** como **Nombre de rol**. Elija **Crear rol**.

   Ahora ha creado un nuevo rol de IAM llamado `KA-stream-rw-role`. A continuación, actualice las políticas de confianza y permisos del rol.

1. Asocie la política de permisos al rol.
**nota**  
Para este ejercicio, Managed Service para Apache Flink asume este rol tanto para leer datos de un flujo de datos de Kinesis (origen) como para escribir la salida en otro flujo de datos de Kinesis. Asocie la política que ha creado en el paso anterior, [Crear una política de permisos](#get-started-exercise-7-cli-policy).

   1. En la página **Resumen**, elija la pestaña **Permisos**.

   1. Seleccione **Asociar políticas**.

   1. En el campo de búsqueda, escriba **KAReadSourceStreamWriteSinkStream** (la política que ha creado en la sección anterior).

   1. Elija la política **KAReadInputStreamWriteOutputStream** y, a continuación, elija **Asociar política**.

Ahora ha creado el rol de ejecución de servicio que utiliza la aplicación para obtener acceso a los recursos. Anote el ARN del nuevo rol.

Para step-by-step obtener instrucciones sobre cómo crear un rol, consulte [Creación de un rol de IAM (consola)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) en la Guía del *usuario de IAM*.

#### Crear la aplicación de Managed Service para Apache Flink
<a name="get-started-exercise-7-cli-create"></a>

1. Guarde el siguiente código JSON en un archivo denominado `create_request.json`. Cambie el ARN del rol de ejemplo por el ARN del rol que ha creado antes. Reemplace el sufijo del ARN del bucket (`username`) por el sufijo que eligió en la sección anterior. Reemplace el ID de la cuenta de ejemplo (`012345678901`) del rol de ejecución del servicio por el ID de su cuenta.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Ejecute la acción [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) con la solicitud anterior para crear la aplicación: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

Se ha creado la aplicación. Se puede iniciar la aplicación en el siguiente paso.

#### Inicio de la aplicación
<a name="get-started-exercise-7-cli-start"></a>

En esta sección, se utiliza la acción [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) para iniciar la aplicación.

**Cómo iniciar la aplicación**

1. Guarde el siguiente código JSON en un archivo denominado `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Ejecute la acción [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) con la solicitud anterior para iniciar la aplicación:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

Ya se debe estar ejecutando la aplicación. Puedes comprobar las métricas de Managed Service for Apache Flink en la CloudWatch consola de Amazon para comprobar que la aplicación funciona.

#### Detención de la aplicación
<a name="get-started-exercise-7-cli-stop"></a>

En esta sección, se utiliza la acción [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) para detener la aplicación.

**Cómo detener la aplicación**

1. Guarde el siguiente código JSON en un archivo denominado `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Ejecute la acción [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) con la siguiente solicitud para detener la aplicación:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

La aplicación se habrá detenido.

# Tutorial: Uso AWS Lambda con Amazon Kinesis Data Streams
<a name="tutorial-stock-data-lambda"></a>

En este tutorial, creará una función de Lambda para consumir eventos de un flujo de datos de Kinesis. En este escenario de ejemplo, una aplicación personalizada escribe registros en una transmisión de datos de Kinesis. AWS Lambda a continuación, sondea este flujo de datos y, cuando detecta nuevos registros de datos, invoca la función Lambda. AWS Lambda luego ejecuta la función Lambda asumiendo la función de ejecución que especificó al crear la función Lambda.

Para obtener instrucciones detalladas paso a paso, consulte el [tutorial: Uso de AWS Lambda con Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html). 

**nota**  
En este tutorial, se presupone que tiene algunos conocimientos sobre las operaciones básicas de Lambda y la consola de AWS Lambda . Si aún no lo ha hecho, siga las instrucciones de [Introducción a AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) para crear su primera función de Lambda.

# Utilice la solución AWS de transmisión de datos para Amazon Kinesis
<a name="examples-streaming-solution"></a>

La solución de datos de AWS streaming para Amazon Kinesis configura automáticamente los AWS servicios necesarios para capturar, almacenar, procesar y entregar fácilmente los datos de streaming. La solución ofrece varias opciones para resolver casos de uso de datos de streaming que utilizan varios AWS servicios, incluidos Kinesis Data Streams AWS Lambda, Amazon API Gateway y Amazon Managed Service for Apache Flink. 

Cada solución incluye los siguientes componentes:
+ Un CloudFormation paquete para implementar el ejemplo completo.
+ Un CloudWatch panel para mostrar las métricas de la aplicación.
+ CloudWatch alarmas sobre las métricas de aplicación más relevantes.
+ Todas las políticas y roles de IAM necesarios.

La solución se encuentra aquí: [Solución de datos de flujos para Amazon Kinesis](https://aws.amazon.com/solutions/implementations/aws-streaming-data-solution-for-amazon-kinesis/)