

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Implementa il produttore
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Questo tutorial utilizza lo scenario reale del monitoraggio del commercio azionario. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il [codice sorgente](https://github.com/aws-samples/amazon-kinesis-learning ) e rivedi le informazioni riportate di seguito.

**StockTrade classe**  
Una singola negoziazione è rappresentata da un'istanza della classe StockTrade. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te. 

**Record di flusso**  
Un flusso è una sequenza di record. Un record è una serializzazione di un'istanza `StockTrade` in formato JSON. Esempio:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
StockTradeGenerator ha un metodo chiamato `getRandomTrade()` che restituisce una nuova compravendita di azioni generata casualmente ogni volta che viene richiamata. Questa classe è implementata per te.

**StockTradesWriter classe**  
Il metodo `main` del producer, StockTradesWriter recupera continuamente uno scambio casuale e lo invia al flusso di dati Kinesis eseguendo queste operazioni:  

1. Legge il nome del flusso di dati e il nome della regione come input.

1. Utilizza il `KinesisAsyncClientBuilder` per impostare la regione, le credenziali e la configurazione del client. 

1. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore). 

1. In un ciclo continuo, chiama il metodo `StockTradeGenerator.getRandomTrade()` e quindi il metodo `sendStockTrade` per inviare lo scambio al flusso ogni 100 millisecondi. 
Il metodo `sendStockTrade` della classe `StockTradesWriter` include il codice seguente:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Fai riferimento alla seguente suddivisione del codice:
+ L'`PutRecord`API prevede un array di byte ed è necessario convertire gli scambi in formato JSON. Questa singola riga di codice esegue tale operazione: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Prima di poter inviare lo scambio, devi creare una nuova istanza `PutRecordRequest` (denominata richiesta in questo caso): Ogni `request` richiede il nome del flusso, la chiave di partizione e un blob di dati. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  L'esempio utilizza uno stock ticker come chiave di partizione, che mappa il record su uno shard specifico. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta [Scrittura di dati su Amazon Kinesis Data Streams](building-producers.md).

  Ora `request` è pronto per l'invio al client (operazione put): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Aggiungi il try/catch blocco attorno all'operazione: `put` 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Questo è perché un'operazione put del flusso di dati Kinesis può non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Si consiglia di valutare attentamente la politica in materia di nuovi tentativi per `put` le operazioni volte a evitare la perdita di dati, ad esempio utilizzando un nuovo tentativo. 
+ La registrazione dello stato è utile, ma opzionale:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Il producer mostrato qui utilizza la funzionalità record singolo dell'API del flusso di dati Kinesis, `PutRecord`. In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di `PutRecords` e inviare batch di record ogni volta. Per ulteriori informazioni, consulta [Scrittura di dati su Amazon Kinesis Data Streams](building-producers.md).

**Per eseguire il producer**

1. Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in [Crea una policy e un utente IAM](tutorial-stock-data-kplkcl2-iam.md) siano salvate nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradeWriter` con i seguenti argomenti:

   ```
   StockTradeStream us-west-2
   ```

   Se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementa il consumatore](tutorial-stock-data-kplkcl2-consumer.md)