

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Implementar el productor
<a name="tutorial-stock-data-kplkcl-producer"></a>



La aplicación del [Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x[Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) utiliza un escenario real de monitorización del mercado bursátil. Los siguientes principios explicar brevemente la forma en que este escenario se asocia con la estructura del productor y el código de apoyo.

Consulte el código fuente y revise la siguiente información.

**StockTrade clase**  
Una operación bursátil está representada por una instancia de la clase `StockTrade`. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted.

**Registro de la secuencia**  
Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de `StockTrade` en formato JSON. Por ejemplo:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator clase**  
`StockTradeGenerator` tiene un método denominado `getRandomTrade()` que proporciona una nueva operación bursátil generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted.

**StockTradesWriter clase**  
El método `main` del productor, `StockTradesWriter` recupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:  

1. Lee los nombres de la secuencia y la región como entrada.

1. Crea un `AmazonKinesisClientBuilder`.

1. Utiliza el compilador de clientes para establecer la región, las credenciales y la configuración de cliente.

1. Crea un cliente de `AmazonKinesis` mediante el compilador de clientes.

1. Comprueba que la secuencia existe y está activa (si no, sale con un error).

1. En un bucle continuo, llama al método `StockTradeGenerator.getRandomTrade()` y después al método `sendStockTrade` para enviar la transacción a la secuencia cada 100 milisegundos.
El método `sendStockTrade` de la clase `StockTradesWriter` tiene el siguiente código:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Consulte el siguiente desglose del código:
+ La API de `PutRecord` espera una matriz de bytes, y debe convertir el `trade` a formato JSON. Esta única línea de código realiza esa operación:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Antes de poder enviar la transacción, debe crear una nueva instancia de `PutRecordRequest` (denominada `putRecord` en este caso):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Cada llamada a `PutRecord` requiere el nombre de la secuencia, la clave de partición y el blob de datos. El siguiente código rellenará estos campos en el objeto `putRecord` utilizando sus métodos `setXxxx()`:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  El ejemplo utiliza un ticker como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte [Agregar datos a un flujo](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  Ahora `putRecord` estará listo para el envío al cliente (la operación `put`):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Añada el try/catch bloque alrededor de la `put` operación:

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  Esto se debe a que una operación `put` de Kinesis Data Streams puede fallar debido a un error de red o debido a que el flujo alcanza sus límites de rendimiento y se ve limitado. Le recomendamos comprobar detalladamente su política de reintentos para las operaciones de `put` de modo que evite pérdida de datos, por ejemplo, al utilizar un reintento. 
+ El registro de estado resulta útil, pero es opcional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams, `PutRecord`. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros de `PutRecords` y enviar lotes de registros de una vez. Para obtener más información, consulte [Agregar datos a un flujo](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**Para ejecutar el productor**

1. Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo `~/.aws/credentials`. 

1. Ejecute la clase `StockTradeWriter` con los siguientes argumentos:

   ```
   StockTradeStream us-west-2
   ```

   Si ha creado su secuencia en una región diferente a `us-west-2` tendrá que especificar esa región aquí.

Debería ver una salida similar a esta:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

El flujo de operaciones bursátiles está siendo adquirido por Kinesis Data Streams.

## Siguientes pasos
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[Implementar el consumidor](tutorial-stock-data-kplkcl-consumer.md)