

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

Lo scenario di questo tutorial prevede l'inserimento delle negoziazioni azionarie in un flusso di dati e la scrittura di un'applicazione Amazon Kinesis Data Streams di base che esegue calcoli sul flusso. Imparerai come inviare un flusso di record a Kinesis Data Streams e implementare un'applicazione che consuma ed elabora i record quasi in tempo reale.

**Importante**  
Dopo aver creato uno stream, sul tuo account vengono addebitati costi nominali per l'utilizzo di Kinesis Data Streams perché Kinesis Data Streams non è idoneo per il piano gratuito. AWS Dopo l'avvio dell'applicazione consumer, inoltre, sul tuo account vengono addebitati costi nominali per l'utilizzo di Amazon DynamoDB. L'applicazione consumer utilizza DynamoDB per monitorare lo stato dell'elaborazione. Quando hai finito di utilizzare questa applicazione, elimina le risorse AWS per evitare di incorrere in costi aggiuntivi. Per ulteriori informazioni, consulta [Eseguire la pulizia delle risorse](tutorial-stock-data-kplkcl2-finish.md).

Il codice non accede ai dati del mercato azionario, ma simula il flusso delle negoziazioni. A tale scopo, utilizza un generatore di negoziazioni casuale che utilizza come punto di partenza i dati effettivi del mercato per i primi 25 titoli azionari in base al valore di mercato di febbraio 2015. Se disponi dell'accesso a un flusso di negoziazioni in tempo reale, potresti essere interessato a derivare statistiche utili e tempestive da quel flusso. Ad esempio, potresti eseguire un'analisi basata su finestra scorrevole per determinare i titoli più acquistati negli ultimi 5 minuti. Oppure potresti ricevere una notifica ogni volta che viene effettuato un ordine di vendita troppo grande (ossia, che include un numero eccessivo di titoli). Puoi estendere il codice in questa serie per fornire tale funzionalità.

Puoi seguire la procedura descritta in questo tutorial sul tuo computer desktop o portatile ed eseguire sia il codice producer che consumer sullo stesso computer o su qualsiasi piattaforma che supporta i requisiti predefiniti.

Gli esempi mostrati utilizzano la regione Stati Uniti occidentali (Oregon), ma si applicano a qualunque [Regione AWS che supporta i flussi di dati](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Completamento dei prerequisiti
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Crea un flusso di dati
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Crea una policy e un utente IAM
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Scarica e crea il codice
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implementa il produttore
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implementa il consumatore
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Facoltativo) Estendi il consumatore
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Eseguire la pulizia delle risorse
](tutorial-stock-data-kplkcl2-finish.md)

# Completamento dei prerequisiti
<a name="tutorial-stock-data-kplkcl2-begin"></a>

È necessario soddisfare i seguenti requisiti per completare questo tutorial:

## Creare e utilizzare un account Amazon Web Services
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Prima di iniziare, assicurati di conoscere i concetti discussi in[Amazon Kinesis Data Streams Terminologia e concetti](key-concepts.md), in particolare con gli stream, gli shard, i produttori e i consumatori. È anche utile aver completato le fasi riportate nella seguente guida: [Tutorial: installazione e configurazione AWS CLI di Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

È necessario disporre di un AWS account e di un browser Web per accedere a. Console di gestione AWS

Per l'accesso alla console, utilizza il nome utente IAM e la relativa password per accedere alla pagina [Console di gestione AWS](https://console.aws.amazon.com/console/home) di accesso IAM. *Per informazioni sulle credenziali AWS di sicurezza, incluso l'accesso programmatico e le alternative alle credenziali a lungo termine, consulta le [credenziali AWS di sicurezza nella IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) User Guide.* *Per i dettagli sull'accesso al tuo Account AWS, consulta [Come accedere a nella Guida per l' AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)utente.Accedi ad AWS *

Per ulteriori informazioni su IAM e sulle istruzioni per la configurazione della chiave di sicurezza, consulta [Creazione di un utente IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Soddisfa i requisiti software di sistema
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

Il sistema che stati utilizzando per eseguire l'applicazione deve avere installato Java 7 o versioni successive. Per scaricare e installare la versione più recente di Java Development Kit (JDK), accedi al [sito di installazione di Java SE di Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

È necessaria la versione più recente di [AWS SDK per Java](https://aws.amazon.com/sdk-for-java/). 

[L'applicazione consumer richiede la versione 2.2.9 o successiva della Kinesis Client Library (KCL), che è possibile ottenere da /tree/master. GitHub https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Crea un flusso di dati](tutorial-stock-data-kplkcl2-create-stream.md)

# Crea un flusso di dati
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

Innanzitutto, è necessario creare il flusso di dati che verrà utilizzato nelle fasi successive di questo tutorial.

**Per creare un flusso**

1. [Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Nel riquadro di navigazione, selezionare **Data Streams (Flussi di dati)**.

1. Nella barra di navigazione, espandere il selettore delle regioni e selezionare una regione.

1. Selezionare **Create Kinesis stream (Crea flusso Kinesis)**.

1. Inserire un nome per il flusso di dati (ad esempio, **StockTradeStream**).

1. Inserisci **1** il numero di frammenti, ma continua a **Stimare il numero di frammenti che ti serviranno compressi**.

1. Selezionare **Create Kinesis stream (Crea flusso Kinesis)**.

Nella pagina dell'elenco **Flussi Kinesis**, durante la creazione del flusso, il relativo stato è `CREATING`. Quando il flusso è pronto per essere utilizzato, lo stato diventa `ACTIVE`. 

Se si sceglie il nome del flusso, nella pagina che viene visualizzata, la scheda **Details (Dettagli)** mostra un riepilogo della configurazione del flusso di dati. La sezione **Monitoring (Monitoraggio)** mostra le informazioni di monitoraggio per il flusso.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Crea una policy e un utente IAM](tutorial-stock-data-kplkcl2-iam.md)

# Crea una policy e un utente IAM
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Le migliori pratiche di sicurezza per AWS imporre l'uso di autorizzazioni granulari per controllare l'accesso a diverse risorse. AWS Identity and Access Management (IAM) consente di gestire gli utenti e le autorizzazioni degli utenti in. AWS Una [policy IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) elenca in modo esplicito le operazioni consentite e le risorse per le quali sono applicabili le operazioni.

Di seguito sono elencate le autorizzazioni minime generali richieste per i producer e i consumer del flusso di dati Kinesis.


**Producer**  

| Azioni | Risorsa | Scopo | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Flusso di dati Kinesis | Prima di leggere i record, il consumer verifica se il flusso di dati esiste, è attivo e contiene il flusso di dati. | 
| SubscribeToShard, RegisterStreamConsumer | Flusso di dati Kinesis | Sottoscrive e registra i consumer in un frammento. | 
| PutRecord, PutRecords | Flusso di dati Kinesis | Scrivi i record in un flusso di dati Kinesis. | 


**Consumer**  

| **Azioni** | **Risorsa** | **Scopo** | 
| --- | --- | --- | 
| DescribeStream | Flusso di dati Kinesis | Prima di leggere i record, il consumer verifica se il flusso di dati esiste, è attivo e contiene il flusso di dati. | 
| GetRecords, GetShardIterator  | Flusso di dati Kinesis | Legge record da uno shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabella Amazon DynamoDB | Se il consumer viene sviluppato utilizzando la Kinesis Client Library (KCL) (versione 1.x o 2.x), deve disporre delle autorizzazioni a una tabella DynamoDB per monitorare lo stato di elaborazione dell'applicazione. | 
| DeleteItem | Tabella Amazon DynamoDB | Per quando il consumatore esegue split/merge operazioni sugli shard Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro Amazon | KCL carica anche le metriche su CloudWatch, utili per monitorare l'applicazione. | 

Per questo tutorial, creerai un'unica policy IAM che concede tutte le autorizzazioni precedenti. In produzione, si consiglia di creare due policy, una per i producer e una per i consumer.

**Per creare una policy IAM**

1. Individua l'Amazon Resource Name (ARN) per il nuovo flusso di dati creato nel passaggio precedente. Questo ARN è elencato come **Stream ARN (ARN flusso)** nella parte superiore della scheda **Details (Dettagli)**. Di seguito è riportato il formato ARN:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
Il codice AWS regionale, `us-west-2` ad esempio. Per ulteriori informazioni, consulta [Concetti di regione e zona di disponibilità](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
L'ID AWS dell'account, come mostrato nelle [Impostazioni dell'account](https://console.aws.amazon.com/billing/home?#/account).  
*nome*  
Il nome del flusso di dati creato nel passaggio precedente, ovvero`StockTradeStream`.

1. Determinare l'ARN per la tabella DynamoDB che verrà utilizzata dal consumer (e che verrà creata dalla prima istanza consumer). Deve essere nel seguente formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   La regione e l'ID account sono identici ai valori nell'ARN del flusso di dati utilizzato per questo tutorial, ma il *nome è il nome* della tabella DynamoDB creata e utilizzata dall'applicazione consumer. KCL utilizza il nome dell'applicazione come nome della tabella. In questo passaggio, utilizzare `StockTradesProcessor` per il nome della tabella DynamoDB, poiché è il nome dell'applicazione utilizzato nelle fasi successive di questo tutorial.

1. **Nella console IAM, in **Policies ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), scegli Crea policy**.** Se è la prima volta che si utilizzano le policy IAM, scegli **Inizia**, **Crea policy**.

1. Scegliere **Select (Seleziona)** accanto a **Policy Generator (Generatore di policy)**.

1. Scegli **Amazon Kinesis** come servizio. AWS 

1. Selezionare `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` e `PutRecords` come operazioni consentite.

1. Inserire l'ARN del flusso di dati che si sta utilizzando in questo tutorial.

1. Utilizzare **Add Statement (Aggiungi istruzione)** per ciascuno dei seguenti elementi:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   L'asterisco (`*`) che viene utilizzato quando si specifica un ARN non è richiesto. In questo caso, è perché non esiste una risorsa specifica CloudWatch su cui viene richiamata l'`PutMetricData`azione.

1. Selezionare **Next Step (Fase successiva)**.

1. Modificare **Policy Name (Nome policy)** in `StockTradeStreamPolicy`, rivedere il codice e scegliere **Create Policy (Crea policy)**.

Il documento della policy risultante dovrebbe essere simile a questo:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Per creare un utente IAM**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nella pagina **Users (Utenti)**, scegliere **Add user (Aggiungi utente)**.

1. Per **Nome utente**, digita `StockTradeStreamUser`.

1. Per **Access type (Tipo di accesso)**, scegliere **Programmatic access (Accesso programmatico)**, quindi selezionare **Next: Permissions (Successivo: autorizzazioni)**.

1. Scegli **Attach existing policies directly (Collega direttamente le policy esistenti)**.

1. Cerca per nome la politica che hai creato nella procedura precedente (. `StockTradeStreamPolicy` Selezionare la casella a sinistra del nome della policy, quindi scegliere **Next: Review (Successivo: revisione)**.

1. Rivedere i dettagli e il riepilogo, quindi scegliere **Create user (Crea utente)**.

1. Copiare l'**ID chiave di accesso** e salvarlo privatamente. In **Secret access key (Chiave di accesso segreta)**, scegliere **Show (Mostra)** e salvare anche la chiave privatamente.

1. Incollare la chiave segreta e quella di accesso su un file locale in un luogo sicuro non accessibile ad altri. Per questa applicazione, è necessario creare un file denominato ` ~/.aws/credentials` (con autorizzazioni rigorose). Il file deve avere il formato seguente:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Collegamento di una policy IAM a un utente**

1. Nella console IAM, apri [Policy](https://console.aws.amazon.com/iam/home?#policies) e scegli **Operazioni di policy**. 

1. Scegliere `StockTradeStreamPolicy` e **Attach (Collega)**.

1. Scegliere `StockTradeStreamUser` e **Attach Policy (Collega policy)**.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Scarica e crea il codice](tutorial-stock-data-kplkcl2-download.md)

# Scarica e crea il codice
<a name="tutorial-stock-data-kplkcl2-download"></a>

In questo argomento viene fornito il codice di implementazione di esempio per l'inserimento di operazioni di trading di esempio nel flusso di dati (*producer*) e l'elaborazione di questi dati (*consumer*).

**Per scaricare e creare il codice**

1. Scarica il codice sorgente dal [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repository sul tuo computer.

1. Creare un progetto nell'IDE con il codice sorgente, rispettando la struttura di directory fornita.

1. Aggiungere le seguenti librerie al progetto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. A seconda dell'IDE, il progetto potrebbe essere creato automaticamente. In caso contrario, creare il progetto seguendo la procedura appropriata per l'IDE utilizzato.

Se la procedura viene completata correttamente, puoi passare alla sezione successiva, [Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md). 

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)

# Implementa il produttore
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Questo tutorial utilizza lo scenario reale del monitoraggio del commercio azionario. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il [codice sorgente](https://github.com/aws-samples/amazon-kinesis-learning ) e rivedi le informazioni riportate di seguito.

**StockTrade classe**  
Una singola negoziazione è rappresentata da un'istanza della classe StockTrade. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te. 

**Record di flusso**  
Un flusso è una sequenza di record. Un record è una serializzazione di un'istanza `StockTrade` in formato JSON. Esempio:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
StockTradeGenerator ha un metodo chiamato `getRandomTrade()` che restituisce una nuova compravendita di azioni generata casualmente ogni volta che viene richiamata. Questa classe è implementata per te.

**StockTradesWriter classe**  
Il metodo `main` del producer, StockTradesWriter recupera continuamente uno scambio casuale e lo invia al flusso di dati Kinesis eseguendo queste operazioni:  

1. Legge il nome del flusso di dati e il nome della regione come input.

1. Utilizza il `KinesisAsyncClientBuilder` per impostare la regione, le credenziali e la configurazione del client. 

1. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore). 

1. In un ciclo continuo, chiama il metodo `StockTradeGenerator.getRandomTrade()` e quindi il metodo `sendStockTrade` per inviare lo scambio al flusso ogni 100 millisecondi. 
Il metodo `sendStockTrade` della classe `StockTradesWriter` include il codice seguente:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Fai riferimento alla seguente suddivisione del codice:
+ L'`PutRecord`API prevede un array di byte ed è necessario convertire gli scambi in formato JSON. Questa singola riga di codice esegue tale operazione: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Prima di poter inviare lo scambio, devi creare una nuova istanza `PutRecordRequest` (denominata richiesta in questo caso): Ogni `request` richiede il nome del flusso, la chiave di partizione e un blob di dati. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  L'esempio utilizza uno stock ticker come chiave di partizione, che mappa il record su uno shard specifico. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta [Scrittura di dati su Amazon Kinesis Data Streams](building-producers.md).

  Ora `request` è pronto per l'invio al client (operazione put): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Aggiungi il try/catch blocco attorno all'operazione: `put` 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Questo è perché un'operazione put del flusso di dati Kinesis può non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Si consiglia di valutare attentamente la politica in materia di nuovi tentativi per `put` le operazioni volte a evitare la perdita di dati, ad esempio utilizzando un nuovo tentativo. 
+ La registrazione dello stato è utile, ma opzionale:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Il producer mostrato qui utilizza la funzionalità record singolo dell'API del flusso di dati Kinesis, `PutRecord`. In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di `PutRecords` e inviare batch di record ogni volta. Per ulteriori informazioni, consulta [Scrittura di dati su Amazon Kinesis Data Streams](building-producers.md).

**Per eseguire il producer**

1. Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in [Crea una policy e un utente IAM](tutorial-stock-data-kplkcl2-iam.md) siano salvate nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradeWriter` con i seguenti argomenti:

   ```
   StockTradeStream us-west-2
   ```

   Se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementa il consumatore](tutorial-stock-data-kplkcl2-consumer.md)

# Implementa il consumatore
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

L'applicazione consumer in questo tutorial elabora continuamente le operazioni azionarie nel flusso di dati. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che esegue numerose delle attività impegnative comuni alle applicazioni consumer. Per ulteriori informazioni, consulta [Informazioni su KCL 1.x e 2.x](shared-throughput-kcl-consumers.md). 

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

**StockTradesProcessor classe**  
La classe principale del consumatore, fornita per te, che svolge le seguenti attività:  
+ Legge l'applicazione, il flusso di dati e i nomi delle regioni, passati come argomenti.
+ Crea un'`KinesisAsyncClient`istanza con il nome della regione.
+ Crea un'istanza `StockTradeRecordProcessorFactory` che serve istanze di `ShardRecordProcessor`, implementate da un'istanza `StockTradeRecordProcessor`. 
+ Crea un'`ConfigsBuilder`istanza con l'`StockTradeRecordProcessorFactory`istanza `KinesisAsyncClient` `StreamName``ApplicationName`,, e. Questo è utile per creare tutte le configurazioni con valori predefiniti.
+ Crea uno scheduler KCL (in precedenza, nelle versioni di KCL 1.x era noto come worker KCL) con l'istanza `ConfigsBuilder`. 
+ Lo scheduler crea un nuovo thread per ciascun shard (assegnato a questa istanza consumer), che in un ciclo continuo legge i record dai flussi di dati. Quindi invoca l'istanza `StockTradeRecordProcessor` per elaborare ogni batch di record ricevuto. 

**StockTradeRecordProcessor classe**  
Implementazione dell'istanza `StockTradeRecordProcessor`, che a sua volta implementa cinque metodi richiesti: `initialize`, `processRecords`, `leaseLost`, `shardEnded` e `shutdownRequested`.   
I metodi `initialize` e `shutdownRequested` vengono utilizzati da KCL per consentire all'elaboratore di record di sapere quando dovrebbe essere pronto a iniziare a ricevere record e quando dovrebbe aspettarsi di non ricevere più record, in modo da poter effettuare qualsiasi attività di configurazione e cessazione specifica per l'app. `leaseLost` e `shardEnded` sono utilizzati per implementare qualsiasi logica su cosa fare quando un lease viene perso o quando una elaborazione ha raggiunto la fine del frammento. In questo esempio, registriamo semplicemente i messaggi che indicano questi eventi.   
Ti forniamo il codice per questi metodi. L'elaborazione principale si verifica nel metodo `processRecords`, che a sua volta utilizza `processRecord` per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato in modo dettagliato.   
Da segnalare è anche l'implementazione dei metodi di supporto per `processRecord`, ovvero `reportStats` e `resetStats`, che sono vuoti nel codice sorgente originale.   
Il metodo `processRecords` viene implementato per te ed esegue questa procedura:  
+ Per ogni record passato, chiama `processRecord` su di esso. 
+ Se è trascorso almeno 1 minuto dall'ultimo report, chiama `reportStats()`, che consente di stampare le statistiche più recenti, seguito da `resetStats()`, che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record.
+ Imposta l'orario della creazione di report successiva.
+ Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama `checkpoint()`. 
+ Imposta l'orario della creazione di checkpoint successiva.
Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sul checkpointing, consulta [Utilizzo della Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats classe**  
Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:  
+ `addStockTrade(StockTrade)`: inserisce un dato `StockTrade` nelle statistiche in esecuzione.
+ `toString()`: restituisce le statistiche in una stringa formattata.
Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di negoziazioni per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.

Aggiungi codice ai metodi della classe `StockTradeRecordProcessor`, come mostrato nella procedura seguente. 

**Per implementare il consumer**

1. Implementare il metodo `processRecord` creando un'istanza di un oggetto `StockTrade` delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implementa un `reportStats` metodo. Modifica il formato di output in base alle tue preferenze. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implementare il metodo `resetStats`, che crea una nuova istanza `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implementa i seguenti metodi richiesti dall'`ShardRecordProcessor`interfaccia:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Per eseguire il consumer**

1. Eseguire il producer scritto in [[Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md) per inserire record di scambi simulati nel flusso.

1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradesProcessor` con i seguenti argomenti:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Nota: se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Facoltativo) Estendi il consumatore](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Facoltativo) Estendi il consumatore
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

Questa sezione opzionale mostra in che modo puoi ampliare la funzionalità del codice consumer per uno scenario leggermente più elaborato.

Se desideri conoscere i maggiori ordini di vendita ogni minuto, puoi modificare la classe `StockStats` in tre punti per supportare questa nuova priorità.

**Per ampliare la funzionalità del consumer**

1. Aggiungere nuove variabili di istanza:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Aggiungere il seguente codice a `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modificare il metodo `toString` per stampare le informazioni aggiuntive:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Se si esegue il consumer ora (ricordare di eseguire anche il producer), dovrebbe essere visualizzato un output simile a questo:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Eseguire la pulizia delle risorse](tutorial-stock-data-kplkcl2-finish.md)

# Eseguire la pulizia delle risorse
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Poiché l'utilizzo del flusso di dati Kinesis è a pagamento, una volta terminato di usarlo assicurati di eliminarlo insieme alle tabelle Amazon DynamoDB corrispondenti. Vengono infatti applicati costi nominali a un flusso attivo, anche quando non invii o ricevi record. Ciò si verifica perché un flusso attivo utilizza risorse monitorando in modo continuo i record in entrata e le richieste per ottenere record.

**Per eliminare il flusso e la tabella**

1. Chiudete tutti i produttori e i consumatori che potreste avere ancora in gestione.

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Scegliere il flusso creato per questa applicazione (`StockTradeStream`).

1. Scegliere **Delete Stream (Elimina flusso)**.

1. Apri la console DynamoDB all'indirizzo. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Eliminare la tabella `StockTradesProcessor`.

## Riepilogo
<a name="tutorial-stock-data-kplkcl2-summary"></a>

L'elaborazione di una grande quantità di dati quasi in tempo reale non richiede la scrittura di codice complicato o lo sviluppo di un'infrastruttura enorme. Elaborare una piccola quantità di dati (come la scrittura`processRecord(Record)`) è tanto semplice quanto scrivere la logica, ma usare Kinesis Data Streams per scalare in modo che funzioni per una grande quantità di dati in streaming. Non devi preoccuparti del dimensionamento dell'elaborazione, perché il flusso di dati Kinesis lo gestisce per conto tuo. Devi soltanto inviare i tuoi record di streaming al flusso di dati Kinesis e scrivere la logica per elaborare ogni nuovo record ricevuto. 

Di seguito sono elencati alcuni miglioramenti potenziali per questa applicazione.

**Aggregazione tra tutti gli shard**  
Al momento, puoi ottenere statistiche derivate dall'aggregazione dei record di dati ricevuti da un singolo ruolo di lavoro e provenienti da un solo shard (uno shard non può essere elaborato da più di un ruolo di lavoro in un'unica applicazione allo stesso tempo). Quando esegui il dimensionamento e disponi di più di uno shard, potresti voler effettuare l'aggregazione tra tutti gli shard. Per farlo, ti occorre un'architettura pipeline in cui l'output di ogni ruolo di lavoro viene utilizzato in un altro flusso con un singolo shard, che viene elaborato da un ruolo di lavoro che aggrega gli output della prima fase. Poiché i dati della prima fase sono limitati (un campione al minuto per shard), possono essere gestiti facilmente da uno shard.

**Dimensionamento dell'elaborazione**  
Quando il flusso viene incrementato per disporre di numerosi shard (dal momento che molti producer inviano dati), per dimensionare l'elaborazione è possibile aggiungere ulteriori ruoli di lavoro. Puoi eseguire i worker nelle istanze Amazon EC2 e utilizzare gruppi con dimensionamento automatico.

**Usa i connettori per Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Poiché uno stream viene elaborato continuamente, il relativo output può essere inviato ad altre destinazioni. AWS fornisce [connettori](https://github.com/awslabs/amazon-kinesis-connectors) per integrare Kinesis Data Streams AWS con altri servizi e strumenti di terze parti.