

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Implementa il produttore
<a name="tutorial-stock-data-kplkcl-producer"></a>



L'applicazione nel [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x[Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) utilizza come scenario quello del monitoraggio del mercato azionario reale. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

**StockTrade classe**  
Una singola negoziazione è rappresentata da un'istanza della classe `StockTrade`. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te.

**Record di flusso**  
Un flusso è una sequenza di record. Un record è una serializzazione di un'istanza `StockTrade` in formato JSON. Esempio:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
`StockTradeGenerator` include un metodo denominato `getRandomTrade()` che restituisce una negoziazione generata casualmente ogni volta che viene invocata. Questa classe è implementata per te.

**StockTradesWriter classe**  
Il metodo `main` del producer, `StockTradesWriter` recupera continuamente uno scambio casuale e lo invia al flusso di dati Kinesis eseguendo queste operazioni:  

1. Legge il nome del flusso e il nome della regione come input.

1. Crea un `AmazonKinesisClientBuilder`.

1. Utilizza il generatore client per impostare la regione, le credenziali e la configurazione del client.

1. Crea un client `AmazonKinesis` utilizzando il generatore di client.

1. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore).

1. In un ciclo continuo, chiama il metodo `StockTradeGenerator.getRandomTrade()` e quindi il metodo `sendStockTrade` per inviare lo scambio al flusso ogni 100 millisecondi.
Il metodo `sendStockTrade` della classe `StockTradesWriter` include il codice seguente:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Fai riferimento alla seguente suddivisione del codice:
+ L'`PutRecord`API prevede un array di byte e devi convertirlo in formato `trade` JSON. Questa singola riga di codice esegue tale operazione:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Prima di poter inviare lo scambio, devi creare una nuova istanza `PutRecordRequest` (denominata `putRecord` in questo caso):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Ogni chiamata `PutRecord` richiede il nome del flusso, la chiave di partizione e il blob di dati. Il codice seguente compila questi campi nell'oggetto `putRecord` utilizzando i relativi metodi `setXxxx()`:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  Questo esempio utilizza un ticket per titoli come chiave di partizione, che mappa il record a un determinato shard. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta [Aggiungi dati a uno stream](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  Ora `putRecord` è pronto per l'invio al client (operazione `put`):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Aggiungi il try/catch blocco attorno all'operazione: `put`

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  Questo è perché un'operazione `put` del flusso di dati Kinesis potrebbe non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Ti consigliamo di considerare attentamente la politica di riprova per `put` le operazioni volte a evitare la perdita di dati, ad esempio utilizzando un nuovo tentativo. 
+ La registrazione dello stato è utile, ma opzionale:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Il producer mostrato qui utilizza la funzionalità record singolo dell'API del flusso di dati Kinesis, `PutRecord`. In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di `PutRecords` e inviare batch di record ogni volta. Per ulteriori informazioni, consulta [Aggiungi dati a uno stream](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**Per eseguire il producer**

1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradeWriter` con i seguenti argomenti:

   ```
   StockTradeStream us-west-2
   ```

   Se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.

## Fasi successive
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[Implementa il consumatore](tutorial-stock-data-kplkcl-consumer.md)