

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Tutorial introduttivi per Amazon Kinesis Data Streams
<a name="examples"></a>

Amazon Kinesis Data Streams offre diverse soluzioni per l'acquisizione e il consumo di dati dai flussi di dati Kinesis. I tutorial in questa sezione sono progettati per aiutarti ulteriormente a comprendere i concetti e le funzionalità di Amazon Kinesis Data Streams e a identificare la soluzione che soddisfa le tue esigenze. 

**Topics**
+ [

# Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 2.x
](tutorial-stock-data-kplkcl2.md)
+ [

# Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x
](tutorial-stock-data-kplkcl.md)
+ [

# Tutorial: analizza i dati di borsa in tempo reale utilizzando Amazon Managed Service per Apache Flink
](tutorial-stock-data.md)
+ [

# Tutorial: utilizzo AWS Lambda con Amazon Kinesis Data Streams
](tutorial-stock-data-lambda.md)
+ [

# Usa la soluzione di AWS streaming di dati per Amazon Kinesis
](examples-streaming-solution.md)

# Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

Lo scenario di questo tutorial prevede l'inserimento delle negoziazioni azionarie in un flusso di dati e la scrittura di un'applicazione Amazon Kinesis Data Streams di base che esegue calcoli sul flusso. Imparerai come inviare un flusso di record a Kinesis Data Streams e implementare un'applicazione che consuma ed elabora i record quasi in tempo reale.

**Importante**  
Dopo aver creato uno stream, sul tuo account vengono addebitati costi nominali per l'utilizzo di Kinesis Data Streams perché Kinesis Data Streams non è idoneo per il piano gratuito. AWS Dopo l'avvio dell'applicazione consumer, inoltre, sul tuo account vengono addebitati costi nominali per l'utilizzo di Amazon DynamoDB. L'applicazione consumer utilizza DynamoDB per monitorare lo stato dell'elaborazione. Quando hai finito di utilizzare questa applicazione, elimina le risorse AWS per evitare di incorrere in costi aggiuntivi. Per ulteriori informazioni, consulta [Eseguire la pulizia delle risorse](tutorial-stock-data-kplkcl2-finish.md).

Il codice non accede ai dati del mercato azionario, ma simula il flusso delle negoziazioni. A tale scopo, utilizza un generatore di negoziazioni casuale che utilizza come punto di partenza i dati effettivi del mercato per i primi 25 titoli azionari in base al valore di mercato di febbraio 2015. Se disponi dell'accesso a un flusso di negoziazioni in tempo reale, potresti essere interessato a derivare statistiche utili e tempestive da quel flusso. Ad esempio, potresti eseguire un'analisi basata su finestra scorrevole per determinare i titoli più acquistati negli ultimi 5 minuti. Oppure potresti ricevere una notifica ogni volta che viene effettuato un ordine di vendita troppo grande (ossia, che include un numero eccessivo di titoli). Puoi estendere il codice in questa serie per fornire tale funzionalità.

Puoi seguire la procedura descritta in questo tutorial sul tuo computer desktop o portatile ed eseguire sia il codice producer che consumer sullo stesso computer o su qualsiasi piattaforma che supporta i requisiti predefiniti.

Gli esempi mostrati utilizzano la regione Stati Uniti occidentali (Oregon), ma si applicano a qualunque [Regione AWS che supporta i flussi di dati](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Completamento dei prerequisiti
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Crea un flusso di dati
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Crea una policy e un utente IAM
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Scarica e crea il codice
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implementa il produttore
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implementa il consumatore
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Facoltativo) Estendi il consumatore
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Eseguire la pulizia delle risorse
](tutorial-stock-data-kplkcl2-finish.md)

# Completamento dei prerequisiti
<a name="tutorial-stock-data-kplkcl2-begin"></a>

È necessario soddisfare i seguenti requisiti per completare questo tutorial:

## Creare e utilizzare un account Amazon Web Services
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Prima di iniziare, assicurati di conoscere i concetti discussi in[Amazon Kinesis Data Streams Terminologia e concetti](key-concepts.md), in particolare con gli stream, gli shard, i produttori e i consumatori. È anche utile aver completato le fasi riportate nella seguente guida: [Tutorial: installazione e configurazione AWS CLI di Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

È necessario disporre di un AWS account e di un browser Web per accedere a. Console di gestione AWS

Per l'accesso alla console, utilizza il nome utente IAM e la relativa password per accedere alla pagina [Console di gestione AWS](https://console.aws.amazon.com/console/home) di accesso IAM. *Per informazioni sulle credenziali AWS di sicurezza, incluso l'accesso programmatico e le alternative alle credenziali a lungo termine, consulta le [credenziali AWS di sicurezza nella IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) User Guide.* *Per i dettagli sull'accesso al tuo Account AWS, consulta [Come accedere a nella Guida per l' AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)utente.Accedi ad AWS *

Per ulteriori informazioni su IAM e sulle istruzioni per la configurazione della chiave di sicurezza, consulta [Creazione di un utente IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Soddisfa i requisiti software di sistema
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

Il sistema che stati utilizzando per eseguire l'applicazione deve avere installato Java 7 o versioni successive. Per scaricare e installare la versione più recente di Java Development Kit (JDK), accedi al [sito di installazione di Java SE di Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

È necessaria la versione più recente di [AWS SDK per Java](https://aws.amazon.com/sdk-for-java/). 

[L'applicazione consumer richiede la versione 2.2.9 o successiva della Kinesis Client Library (KCL), che è possibile ottenere da /tree/master. GitHub https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Crea un flusso di dati](tutorial-stock-data-kplkcl2-create-stream.md)

# Crea un flusso di dati
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

Innanzitutto, è necessario creare il flusso di dati che verrà utilizzato nelle fasi successive di questo tutorial.

**Per creare un flusso**

1. [Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Nel riquadro di navigazione, selezionare **Data Streams (Flussi di dati)**.

1. Nella barra di navigazione, espandere il selettore delle regioni e selezionare una regione.

1. Selezionare **Create Kinesis stream (Crea flusso Kinesis)**.

1. Inserire un nome per il flusso di dati (ad esempio, **StockTradeStream**).

1. Inserisci **1** il numero di frammenti, ma continua a **Stimare il numero di frammenti che ti serviranno compressi**.

1. Selezionare **Create Kinesis stream (Crea flusso Kinesis)**.

Nella pagina dell'elenco **Flussi Kinesis**, durante la creazione del flusso, il relativo stato è `CREATING`. Quando il flusso è pronto per essere utilizzato, lo stato diventa `ACTIVE`. 

Se si sceglie il nome del flusso, nella pagina che viene visualizzata, la scheda **Details (Dettagli)** mostra un riepilogo della configurazione del flusso di dati. La sezione **Monitoring (Monitoraggio)** mostra le informazioni di monitoraggio per il flusso.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Crea una policy e un utente IAM](tutorial-stock-data-kplkcl2-iam.md)

# Crea una policy e un utente IAM
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Le migliori pratiche di sicurezza per AWS imporre l'uso di autorizzazioni granulari per controllare l'accesso a diverse risorse. AWS Identity and Access Management (IAM) consente di gestire gli utenti e le autorizzazioni degli utenti in. AWS Una [policy IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) elenca in modo esplicito le operazioni consentite e le risorse per le quali sono applicabili le operazioni.

Di seguito sono elencate le autorizzazioni minime generali richieste per i producer e i consumer del flusso di dati Kinesis.


**Producer**  

| Azioni | Risorsa | Scopo | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Flusso di dati Kinesis | Prima di leggere i record, il consumer verifica se il flusso di dati esiste, è attivo e contiene il flusso di dati. | 
| SubscribeToShard, RegisterStreamConsumer | Flusso di dati Kinesis | Sottoscrive e registra i consumer in un frammento. | 
| PutRecord, PutRecords | Flusso di dati Kinesis | Scrivi i record in un flusso di dati Kinesis. | 


**Consumer**  

| **Azioni** | **Risorsa** | **Scopo** | 
| --- | --- | --- | 
| DescribeStream | Flusso di dati Kinesis | Prima di leggere i record, il consumer verifica se il flusso di dati esiste, è attivo e contiene il flusso di dati. | 
| GetRecords, GetShardIterator  | Flusso di dati Kinesis | Legge record da uno shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabella Amazon DynamoDB | Se il consumer viene sviluppato utilizzando la Kinesis Client Library (KCL) (versione 1.x o 2.x), deve disporre delle autorizzazioni a una tabella DynamoDB per monitorare lo stato di elaborazione dell'applicazione. | 
| DeleteItem | Tabella Amazon DynamoDB | Per quando il consumatore esegue split/merge operazioni sugli shard Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro Amazon | KCL carica anche le metriche su CloudWatch, utili per monitorare l'applicazione. | 

Per questo tutorial, creerai un'unica policy IAM che concede tutte le autorizzazioni precedenti. In produzione, si consiglia di creare due policy, una per i producer e una per i consumer.

**Per creare una policy IAM**

1. Individua l'Amazon Resource Name (ARN) per il nuovo flusso di dati creato nel passaggio precedente. Questo ARN è elencato come **Stream ARN (ARN flusso)** nella parte superiore della scheda **Details (Dettagli)**. Di seguito è riportato il formato ARN:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
Il codice AWS regionale, `us-west-2` ad esempio. Per ulteriori informazioni, consulta [Concetti di regione e zona di disponibilità](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
L'ID AWS dell'account, come mostrato nelle [Impostazioni dell'account](https://console.aws.amazon.com/billing/home?#/account).  
*nome*  
Il nome del flusso di dati creato nel passaggio precedente, ovvero`StockTradeStream`.

1. Determinare l'ARN per la tabella DynamoDB che verrà utilizzata dal consumer (e che verrà creata dalla prima istanza consumer). Deve essere nel seguente formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   La regione e l'ID account sono identici ai valori nell'ARN del flusso di dati utilizzato per questo tutorial, ma il *nome è il nome* della tabella DynamoDB creata e utilizzata dall'applicazione consumer. KCL utilizza il nome dell'applicazione come nome della tabella. In questo passaggio, utilizzare `StockTradesProcessor` per il nome della tabella DynamoDB, poiché è il nome dell'applicazione utilizzato nelle fasi successive di questo tutorial.

1. **Nella console IAM, in **Policies ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), scegli Crea policy**.** Se è la prima volta che si utilizzano le policy IAM, scegli **Inizia**, **Crea policy**.

1. Scegliere **Select (Seleziona)** accanto a **Policy Generator (Generatore di policy)**.

1. Scegli **Amazon Kinesis** come servizio. AWS 

1. Selezionare `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` e `PutRecords` come operazioni consentite.

1. Inserire l'ARN del flusso di dati che si sta utilizzando in questo tutorial.

1. Utilizzare **Add Statement (Aggiungi istruzione)** per ciascuno dei seguenti elementi:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   L'asterisco (`*`) che viene utilizzato quando si specifica un ARN non è richiesto. In questo caso, è perché non esiste una risorsa specifica CloudWatch su cui viene richiamata l'`PutMetricData`azione.

1. Selezionare **Next Step (Fase successiva)**.

1. Modificare **Policy Name (Nome policy)** in `StockTradeStreamPolicy`, rivedere il codice e scegliere **Create Policy (Crea policy)**.

Il documento della policy risultante dovrebbe essere simile a questo:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Per creare un utente IAM**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nella pagina **Users (Utenti)**, scegliere **Add user (Aggiungi utente)**.

1. Per **Nome utente**, digita `StockTradeStreamUser`.

1. Per **Access type (Tipo di accesso)**, scegliere **Programmatic access (Accesso programmatico)**, quindi selezionare **Next: Permissions (Successivo: autorizzazioni)**.

1. Scegli **Attach existing policies directly (Collega direttamente le policy esistenti)**.

1. Cerca per nome la politica che hai creato nella procedura precedente (. `StockTradeStreamPolicy` Selezionare la casella a sinistra del nome della policy, quindi scegliere **Next: Review (Successivo: revisione)**.

1. Rivedere i dettagli e il riepilogo, quindi scegliere **Create user (Crea utente)**.

1. Copiare l'**ID chiave di accesso** e salvarlo privatamente. In **Secret access key (Chiave di accesso segreta)**, scegliere **Show (Mostra)** e salvare anche la chiave privatamente.

1. Incollare la chiave segreta e quella di accesso su un file locale in un luogo sicuro non accessibile ad altri. Per questa applicazione, è necessario creare un file denominato ` ~/.aws/credentials` (con autorizzazioni rigorose). Il file deve avere il formato seguente:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Collegamento di una policy IAM a un utente**

1. Nella console IAM, apri [Policy](https://console.aws.amazon.com/iam/home?#policies) e scegli **Operazioni di policy**. 

1. Scegliere `StockTradeStreamPolicy` e **Attach (Collega)**.

1. Scegliere `StockTradeStreamUser` e **Attach Policy (Collega policy)**.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Scarica e crea il codice](tutorial-stock-data-kplkcl2-download.md)

# Scarica e crea il codice
<a name="tutorial-stock-data-kplkcl2-download"></a>

In questo argomento viene fornito il codice di implementazione di esempio per l'inserimento di operazioni di trading di esempio nel flusso di dati (*producer*) e l'elaborazione di questi dati (*consumer*).

**Per scaricare e creare il codice**

1. Scarica il codice sorgente dal [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repository sul tuo computer.

1. Creare un progetto nell'IDE con il codice sorgente, rispettando la struttura di directory fornita.

1. Aggiungere le seguenti librerie al progetto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. A seconda dell'IDE, il progetto potrebbe essere creato automaticamente. In caso contrario, creare il progetto seguendo la procedura appropriata per l'IDE utilizzato.

Se la procedura viene completata correttamente, puoi passare alla sezione successiva, [Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md). 

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)

# Implementa il produttore
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Questo tutorial utilizza lo scenario reale del monitoraggio del commercio azionario. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il [codice sorgente](https://github.com/aws-samples/amazon-kinesis-learning ) e rivedi le informazioni riportate di seguito.

**StockTrade classe**  
Una singola negoziazione è rappresentata da un'istanza della classe StockTrade. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te. 

**Record di flusso**  
Un flusso è una sequenza di record. Un record è una serializzazione di un'istanza `StockTrade` in formato JSON. Esempio:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
StockTradeGenerator ha un metodo chiamato `getRandomTrade()` che restituisce una nuova compravendita di azioni generata casualmente ogni volta che viene richiamata. Questa classe è implementata per te.

**StockTradesWriter classe**  
Il metodo `main` del producer, StockTradesWriter recupera continuamente uno scambio casuale e lo invia al flusso di dati Kinesis eseguendo queste operazioni:  

1. Legge il nome del flusso di dati e il nome della regione come input.

1. Utilizza il `KinesisAsyncClientBuilder` per impostare la regione, le credenziali e la configurazione del client. 

1. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore). 

1. In un ciclo continuo, chiama il metodo `StockTradeGenerator.getRandomTrade()` e quindi il metodo `sendStockTrade` per inviare lo scambio al flusso ogni 100 millisecondi. 
Il metodo `sendStockTrade` della classe `StockTradesWriter` include il codice seguente:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Fai riferimento alla seguente suddivisione del codice:
+ L'`PutRecord`API prevede un array di byte ed è necessario convertire gli scambi in formato JSON. Questa singola riga di codice esegue tale operazione: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Prima di poter inviare lo scambio, devi creare una nuova istanza `PutRecordRequest` (denominata richiesta in questo caso): Ogni `request` richiede il nome del flusso, la chiave di partizione e un blob di dati. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  L'esempio utilizza uno stock ticker come chiave di partizione, che mappa il record su uno shard specifico. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta [Scrittura di dati su Amazon Kinesis Data Streams](building-producers.md).

  Ora `request` è pronto per l'invio al client (operazione put): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Aggiungi il try/catch blocco attorno all'operazione: `put` 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Questo è perché un'operazione put del flusso di dati Kinesis può non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Si consiglia di valutare attentamente la politica in materia di nuovi tentativi per `put` le operazioni volte a evitare la perdita di dati, ad esempio utilizzando un nuovo tentativo. 
+ La registrazione dello stato è utile, ma opzionale:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Il producer mostrato qui utilizza la funzionalità record singolo dell'API del flusso di dati Kinesis, `PutRecord`. In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di `PutRecords` e inviare batch di record ogni volta. Per ulteriori informazioni, consulta [Scrittura di dati su Amazon Kinesis Data Streams](building-producers.md).

**Per eseguire il producer**

1. Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in [Crea una policy e un utente IAM](tutorial-stock-data-kplkcl2-iam.md) siano salvate nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradeWriter` con i seguenti argomenti:

   ```
   StockTradeStream us-west-2
   ```

   Se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementa il consumatore](tutorial-stock-data-kplkcl2-consumer.md)

# Implementa il consumatore
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

L'applicazione consumer in questo tutorial elabora continuamente le operazioni azionarie nel flusso di dati. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che esegue numerose delle attività impegnative comuni alle applicazioni consumer. Per ulteriori informazioni, consulta [Informazioni su KCL 1.x e 2.x](shared-throughput-kcl-consumers.md). 

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

**StockTradesProcessor classe**  
La classe principale del consumatore, fornita per te, che svolge le seguenti attività:  
+ Legge l'applicazione, il flusso di dati e i nomi delle regioni, passati come argomenti.
+ Crea un'`KinesisAsyncClient`istanza con il nome della regione.
+ Crea un'istanza `StockTradeRecordProcessorFactory` che serve istanze di `ShardRecordProcessor`, implementate da un'istanza `StockTradeRecordProcessor`. 
+ Crea un'`ConfigsBuilder`istanza con l'`StockTradeRecordProcessorFactory`istanza `KinesisAsyncClient` `StreamName``ApplicationName`,, e. Questo è utile per creare tutte le configurazioni con valori predefiniti.
+ Crea uno scheduler KCL (in precedenza, nelle versioni di KCL 1.x era noto come worker KCL) con l'istanza `ConfigsBuilder`. 
+ Lo scheduler crea un nuovo thread per ciascun shard (assegnato a questa istanza consumer), che in un ciclo continuo legge i record dai flussi di dati. Quindi invoca l'istanza `StockTradeRecordProcessor` per elaborare ogni batch di record ricevuto. 

**StockTradeRecordProcessor classe**  
Implementazione dell'istanza `StockTradeRecordProcessor`, che a sua volta implementa cinque metodi richiesti: `initialize`, `processRecords`, `leaseLost`, `shardEnded` e `shutdownRequested`.   
I metodi `initialize` e `shutdownRequested` vengono utilizzati da KCL per consentire all'elaboratore di record di sapere quando dovrebbe essere pronto a iniziare a ricevere record e quando dovrebbe aspettarsi di non ricevere più record, in modo da poter effettuare qualsiasi attività di configurazione e cessazione specifica per l'app. `leaseLost` e `shardEnded` sono utilizzati per implementare qualsiasi logica su cosa fare quando un lease viene perso o quando una elaborazione ha raggiunto la fine del frammento. In questo esempio, registriamo semplicemente i messaggi che indicano questi eventi.   
Ti forniamo il codice per questi metodi. L'elaborazione principale si verifica nel metodo `processRecords`, che a sua volta utilizza `processRecord` per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato in modo dettagliato.   
Da segnalare è anche l'implementazione dei metodi di supporto per `processRecord`, ovvero `reportStats` e `resetStats`, che sono vuoti nel codice sorgente originale.   
Il metodo `processRecords` viene implementato per te ed esegue questa procedura:  
+ Per ogni record passato, chiama `processRecord` su di esso. 
+ Se è trascorso almeno 1 minuto dall'ultimo report, chiama `reportStats()`, che consente di stampare le statistiche più recenti, seguito da `resetStats()`, che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record.
+ Imposta l'orario della creazione di report successiva.
+ Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama `checkpoint()`. 
+ Imposta l'orario della creazione di checkpoint successiva.
Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sul checkpointing, consulta [Utilizzo della Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats classe**  
Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:  
+ `addStockTrade(StockTrade)`: inserisce un dato `StockTrade` nelle statistiche in esecuzione.
+ `toString()`: restituisce le statistiche in una stringa formattata.
Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di negoziazioni per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.

Aggiungi codice ai metodi della classe `StockTradeRecordProcessor`, come mostrato nella procedura seguente. 

**Per implementare il consumer**

1. Implementare il metodo `processRecord` creando un'istanza di un oggetto `StockTrade` delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implementa un `reportStats` metodo. Modifica il formato di output in base alle tue preferenze. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implementare il metodo `resetStats`, che crea una nuova istanza `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implementa i seguenti metodi richiesti dall'`ShardRecordProcessor`interfaccia:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Per eseguire il consumer**

1. Eseguire il producer scritto in [[Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md) per inserire record di scambi simulati nel flusso.

1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradesProcessor` con i seguenti argomenti:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Nota: se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Facoltativo) Estendi il consumatore](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Facoltativo) Estendi il consumatore
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

Questa sezione opzionale mostra in che modo puoi ampliare la funzionalità del codice consumer per uno scenario leggermente più elaborato.

Se desideri conoscere i maggiori ordini di vendita ogni minuto, puoi modificare la classe `StockStats` in tre punti per supportare questa nuova priorità.

**Per ampliare la funzionalità del consumer**

1. Aggiungere nuove variabili di istanza:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Aggiungere il seguente codice a `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modificare il metodo `toString` per stampare le informazioni aggiuntive:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Se si esegue il consumer ora (ricordare di eseguire anche il producer), dovrebbe essere visualizzato un output simile a questo:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Eseguire la pulizia delle risorse](tutorial-stock-data-kplkcl2-finish.md)

# Eseguire la pulizia delle risorse
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Poiché l'utilizzo del flusso di dati Kinesis è a pagamento, una volta terminato di usarlo assicurati di eliminarlo insieme alle tabelle Amazon DynamoDB corrispondenti. Vengono infatti applicati costi nominali a un flusso attivo, anche quando non invii o ricevi record. Ciò si verifica perché un flusso attivo utilizza risorse monitorando in modo continuo i record in entrata e le richieste per ottenere record.

**Per eliminare il flusso e la tabella**

1. Chiudete tutti i produttori e i consumatori che potreste avere ancora in gestione.

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Scegliere il flusso creato per questa applicazione (`StockTradeStream`).

1. Scegliere **Delete Stream (Elimina flusso)**.

1. Apri la console DynamoDB all'indirizzo. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Eliminare la tabella `StockTradesProcessor`.

## Riepilogo
<a name="tutorial-stock-data-kplkcl2-summary"></a>

L'elaborazione di una grande quantità di dati quasi in tempo reale non richiede la scrittura di codice complicato o lo sviluppo di un'infrastruttura enorme. Elaborare una piccola quantità di dati (come la scrittura`processRecord(Record)`) è tanto semplice quanto scrivere la logica, ma usare Kinesis Data Streams per scalare in modo che funzioni per una grande quantità di dati in streaming. Non devi preoccuparti del dimensionamento dell'elaborazione, perché il flusso di dati Kinesis lo gestisce per conto tuo. Devi soltanto inviare i tuoi record di streaming al flusso di dati Kinesis e scrivere la logica per elaborare ogni nuovo record ricevuto. 

Di seguito sono elencati alcuni miglioramenti potenziali per questa applicazione.

**Aggregazione tra tutti gli shard**  
Al momento, puoi ottenere statistiche derivate dall'aggregazione dei record di dati ricevuti da un singolo ruolo di lavoro e provenienti da un solo shard (uno shard non può essere elaborato da più di un ruolo di lavoro in un'unica applicazione allo stesso tempo). Quando esegui il dimensionamento e disponi di più di uno shard, potresti voler effettuare l'aggregazione tra tutti gli shard. Per farlo, ti occorre un'architettura pipeline in cui l'output di ogni ruolo di lavoro viene utilizzato in un altro flusso con un singolo shard, che viene elaborato da un ruolo di lavoro che aggrega gli output della prima fase. Poiché i dati della prima fase sono limitati (un campione al minuto per shard), possono essere gestiti facilmente da uno shard.

**Dimensionamento dell'elaborazione**  
Quando il flusso viene incrementato per disporre di numerosi shard (dal momento che molti producer inviano dati), per dimensionare l'elaborazione è possibile aggiungere ulteriori ruoli di lavoro. Puoi eseguire i worker nelle istanze Amazon EC2 e utilizzare gruppi con dimensionamento automatico.

**Usa i connettori per Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Poiché uno stream viene elaborato continuamente, il relativo output può essere inviato ad altre destinazioni. AWS fornisce [connettori](https://github.com/awslabs/amazon-kinesis-connectors) per integrare Kinesis Data Streams AWS con altri servizi e strumenti di terze parti.

# Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x
<a name="tutorial-stock-data-kplkcl"></a>

Lo scenario per questo tutorial richiede l'importazione del flusso di negoziazioni in un flusso di dati e la scrittura di una semplice applicazione Flusso di dati Amazon Kinesis che esegua calcoli sul flusso. Imparerai come inviare un flusso di record a Kinesis Data Streams e implementare un'applicazione che consuma ed elabora i record quasi in tempo reale.

**Importante**  
Dopo aver creato uno stream, sul tuo account vengono addebitati costi nominali per l'utilizzo di Kinesis Data Streams perché Kinesis Data Streams non è idoneo per il piano gratuito. AWS Dopo l'avvio dell'applicazione consumer, inoltre, sul tuo account vengono addebitati costi nominali per l'utilizzo di Amazon DynamoDB. L'applicazione consumer utilizza DynamoDB per monitorare lo stato dell'elaborazione. Quando hai finito di utilizzare questa applicazione, elimina le risorse AWS per evitare di incorrere in costi aggiuntivi. Per ulteriori informazioni, consulta [Eseguire la pulizia delle risorse](tutorial-stock-data-kplkcl-finish.md).

Il codice non accede ai dati del mercato azionario, ma simula il flusso delle negoziazioni. A tale scopo, utilizza un generatore di negoziazioni casuale che utilizza come punto di partenza i dati effettivi del mercato per i primi 25 titoli azionari in base al valore di mercato di febbraio 2015. Se disponi dell'accesso a un flusso di negoziazioni in tempo reale, potresti essere interessato a derivare statistiche utili e tempestive da quel flusso. Ad esempio, potresti eseguire un'analisi basata su finestra scorrevole per determinare i titoli più acquistati negli ultimi 5 minuti. Oppure potresti ricevere una notifica ogni volta che viene effettuato un ordine di vendita troppo grande (ossia, che include un numero eccessivo di titoli). Puoi estendere il codice in questa serie per fornire tale funzionalità.

Puoi seguire la procedura descritta in questo tutorial sul tuo computer desktop o portatile ed eseguire sia il codice producer che consumer sullo stesso computer o su qualsiasi piattaforma che supporta i requisiti predefiniti, ad esempio Amazon Elastic Compute Cloud (Amazon EC2).

Gli esempi mostrati utilizzano la regione Stati Uniti occidentali (Oregon), ma si applicano a qualunque [Regione AWS che supporta i flussi di dati](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Completamento dei prerequisiti
](tutorial-stock-data-kplkcl-begin.md)
+ [

# Crea un flusso di dati
](tutorial-stock-data-kplkcl-create-stream.md)
+ [

# Crea una policy e un utente IAM
](tutorial-stock-data-kplkcl-iam.md)
+ [

# Scarica e crea il codice di implementazione
](tutorial-stock-data-kplkcl-download.md)
+ [

# Implementa il produttore
](tutorial-stock-data-kplkcl-producer.md)
+ [

# Implementa il consumatore
](tutorial-stock-data-kplkcl-consumer.md)
+ [

# (Facoltativo) Estendi il numero di utenti
](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [

# Eseguire la pulizia delle risorse
](tutorial-stock-data-kplkcl-finish.md)

# Completamento dei prerequisiti
<a name="tutorial-stock-data-kplkcl-begin"></a>

Di seguito sono riportati i requisiti per il completamento del [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x[Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md).

## Creare e utilizzare un account Amazon Web Services
<a name="tutorial-stock-data-kplkcl-begin-aws"></a>

Prima di iniziare, accertati di avere familiarità con i concetti discussi in [Amazon Kinesis Data Streams Terminologia e concetti](key-concepts.md), con particolare riguardo a flussi, shard, producer e consumer. È inoltre utile aver completato [Tutorial: installazione e configurazione AWS CLI di Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Sono necessari un AWS account e un browser Web per accedere a Console di gestione AWS.

Per l'accesso alla console, utilizza il nome utente IAM e la relativa password per accedere alla pagina [Console di gestione AWS](https://console.aws.amazon.com/console/home) di accesso IAM. *Per informazioni sulle credenziali AWS di sicurezza, incluso l'accesso programmatico e le alternative alle credenziali a lungo termine, consulta le [credenziali AWS di sicurezza nella IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) User Guide.* *Per i dettagli sull'accesso al tuo Account AWS, consulta [Come accedere a nella Guida per l' AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)utente.Accedi ad AWS *

Per ulteriori informazioni su IAM e sulle istruzioni per la configurazione della chiave di sicurezza, consulta [Creazione di un utente IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Soddisfa i requisiti software di sistema
<a name="tutorial-stock-data-kplkcl-begin-sys"></a>

Sul sistema utilizzato per eseguire l'applicazione deve essere installato Java 7 o versioni successive. Per scaricare e installare la versione più recente di Java Development Kit (JDK), accedi al [sito di installazione di Java SE di Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

Se disponi di un IDE Java, ad esempio [Eclipse](https://www.eclipse.org/downloads/), puoi aprire, modificare, creare ed eseguire il codice sorgente.

È necessaria la versione più recente di [AWS SDK per Java](https://aws.amazon.com/sdk-for-java/). Se utilizzi Eclipse come IDE, puoi installare invece [AWS Toolkit for Eclipse](https://aws.amazon.com/eclipse/). 

L'applicazione consumer richiede la versione 1.2.1 o successiva di Kinesis Client Library (KCL), disponibile presso GitHub [Kinesis](https://github.com/awslabs/amazon-kinesis-client) Client Library (Java).

## Fasi successive
<a name="tutorial-stock-data-kplkcl-begin-next"></a>

[Crea un flusso di dati](tutorial-stock-data-kplkcl-create-stream.md)

# Crea un flusso di dati
<a name="tutorial-stock-data-kplkcl-create-stream"></a>

Nella prima fase del [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x[Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md), è necessario creare il flusso da utilizzare nelle fasi successive.

**Per creare un flusso**

1. [Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Nel riquadro di navigazione, selezionare **Data Streams (Flussi di dati)**.

1. Nella barra di navigazione, espandere il selettore delle regioni e selezionare una regione.

1. Selezionare **Create Kinesis stream (Crea flusso Kinesis)**.

1. Inserire un nome per il flusso (ad esempio, **StockTradeStream**).

1. Inserisci **1** il numero di frammenti, ma continua a **Stimare il numero di frammenti che ti serviranno compressi**.

1. Selezionare **Create Kinesis stream (Crea flusso Kinesis)**.

Nella pagina dell'elenco **Flussi Kinesis**, lo stato del flusso mentre viene creato è `CREATING`. Quando il flusso è pronto per essere utilizzato, lo stato diventa `ACTIVE`. Seleziona il nome del flusso. Nella pagina che viene visualizzata, la scheda **Details (Dettagli)** mostra un riepilogo della configurazione del flusso. La sezione **Monitoring (Monitoraggio)** mostra le informazioni di monitoraggio per il flusso.

## Informazioni aggiuntive sui frammenti
<a name="tutorial-stock-data-kplkcl-create-stream-info"></a>

Quando inizi a utilizzare il flusso di dati Kinesis al di fuori di questo tutorial, potresti dover pianificare il processo di creazione di un flusso più attentamente. Dovresti effettuare una stima della domanda massima quando esegui il provisioning degli shard. Utilizzando questo scenario come esempio, il numero di scambi nel mercato azionario statunitense raggiunge il picco durante il giorno (fuso orario EST) e le stime della domanda dovrebbero essere effettuate per quell'ora del giorno. A questo punto, puoi scegliere di effettuare il provisioning per la domanda massima attesa o incrementare o ridurre il flusso in risposta alle fluttuazioni della domanda. 

Uno *shard* è un'unità di capacità di throughput. Nella pagina **Crea flusso Kinesis**, espandi **Calcola il numero di partizioni necessarie**. Inserisci le dimensioni medie dei record, il numero massimo di record scritti al secondo e il numero di applicazioni in uso attenendoti alle seguenti indicazioni:

**Dimensione media record**  
Una stima delle dimensioni medie calcolate dei record. Se non conosci questo valore, utilizza le dimensioni massime stimate dei record.

**Numero massimo di record scritti**  
Considerate il numero di entità che forniscono dati e il numero approssimativo di record al secondo prodotti da ciascuna di esse. Ad esempio, se ottieni dati sulle negoziazioni da 20 server di trading e ognuno di essi genera 250 scambi al secondo, il numero totale di scambi (record) è di 5.000 al secondo. 

**Numero di applicazioni in uso**  
Il numero di applicazioni che in modo indipendente effettuano la lettura dal flusso per elaborarlo in modo diverso e produrre un output diverso. Ciascuna applicazione può avere più istanze in esecuzione su macchine diverse (esecuzione in cluster) in modo da poter tenere il passo con un flusso di volume elevato.

Se il numero stimato di shard mostrato supera il limite attuale di shard, potresti dover inviare una richiesta per aumentare tale limite prima di poter creare un flusso con quel numero di shard. Per richiedere un aumento del limite di partizioni, utilizza il [modulo dei limiti del flusso di dati Kinesis](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis). Per ulteriori informazioni sui flussi e sulle partizioni, consulta [Crea e gestisci flussi di dati Kinesis](working-with-streams.md).

## Fasi successive
<a name="tutorial-stock-data-kplkcl-create-stream-next"></a>

[Crea una policy e un utente IAM](tutorial-stock-data-kplkcl-iam.md)

# Crea una policy e un utente IAM
<a name="tutorial-stock-data-kplkcl-iam"></a>

Le migliori pratiche di sicurezza per AWS imporre l'uso di autorizzazioni granulari per controllare l'accesso a diverse risorse. AWS Identity and Access Management (IAM) consente di gestire gli utenti e le autorizzazioni degli utenti in. AWS Una [policy IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) elenca in modo esplicito le operazioni consentite e le risorse per le quali sono applicabili le operazioni.

Di seguito sono elencate le autorizzazioni minime generali richieste per un producer e un consumer del flusso di dati Kinesis.


**Producer**  

| Azioni | Risorsa | Scopo | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Flusso di dati Kinesis | Prima di tentare di scrivere record, il producer controlla se il flusso esiste ed è attivo, se i frammenti sono contenuti nel flusso e se il flusso ha un consumer. | 
| SubscribeToShard, RegisterStreamConsumer | Flusso di dati Kinesis | Sottoscrive e registra un consumer a uno shard flusso di dati Kinesis. | 
| PutRecord, PutRecords | Flusso di dati Kinesis | Scrivi i dati in un flusso di dati Kinesis. | 


**Consumer**  

| **Azioni** | **Risorsa** | **Scopo** | 
| --- | --- | --- | 
| DescribeStream | Flusso di dati Kinesis | Prima di leggere i record, il consumer verifica se il flusso esiste, è attivo e contiene shard. | 
| GetRecords, GetShardIterator  | Flusso di dati Kinesis | Leggi i record da una partizione del flusso di dati Kinesis. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabella Amazon DynamoDB | Se il consumer viene sviluppato utilizzando la Kinesis Client Library (KCL), avrà bisogno delle autorizzazioni a una tabella DynamoDB per monitorare lo stato di elaborazione dell'applicazione. Il primo consumer avviato crea la tabella.  | 
| DeleteItem | Tabella Amazon DynamoDB | Per quando il consumatore esegue split/merge operazioni sugli shard Kinesis Data Streams. | 
| PutMetricData |  CloudWatch Registro Amazon | KCL carica anche le metriche su CloudWatch, utili per monitorare l'applicazione. | 

Per questa applicazione, devi creare una singola policy IAM che conceda tutte le autorizzazioni precedenti. In pratica, potresti voler creare due policy, una per i producer e una per i consumer.

**Per creare una policy IAM**

1. Individuare l'Amazon Resource Name (ARN) per il nuovo flusso. Questo ARN è elencato come **Stream ARN (ARN flusso)** nella parte superiore della scheda **Details (Dettagli)**. Di seguito è riportato il formato ARN:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
Il codice regione, ad esempio, `us-west-2`. Per ulteriori informazioni, consulta [Concetti di regione e zona di disponibilità](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*account*  
[L'ID AWS dell'account, come mostrato in Impostazioni account.](https://console.aws.amazon.com/billing/home?#/account)  
*nome*  
Il nome del flusso dal [Crea un flusso di dati](tutorial-stock-data-kplkcl-create-stream.md), ovvero `StockTradeStream`.

1. Determinare l'ARN per la tabella DynamoDB che verrà utilizzata dal consumer (e creata dalla prima istanza consumer). Deve essere nel seguente formato:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   La regione e l'account sono gli stessi della fase precedente, ma questa volta il *nome* è quello della tabella creata e utilizzata dall'applicazione consumer. La KCL utilizzata dal consumer usa il nome dell'applicazione come nome della tabella. Utilizzare `StockTradesProcessor`, che è il nome dell'applicazione utilizzato successivamente.

1. Nella console IAM, in **Policies** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), scegli **Crea policy**. Se è la prima volta che si utilizzano le policy IAM, scegli **Inizia**, **Crea policy**.

1. Scegliere **Select (Seleziona)** accanto a **Policy Generator (Generatore di policy)**.

1. Scegli **Amazon Kinesis** come servizio. AWS 

1. Selezionare `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` e `PutRecords` come operazioni consentite.

1. Inserire l'ARN creato alla fase 1.

1. Utilizzare **Add Statement (Aggiungi istruzione)** per ciascuno dei seguenti elementi:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   L'asterisco (`*`) che viene utilizzato quando si specifica un ARN non è richiesto. In questo caso, è perché non esiste una risorsa specifica CloudWatch su cui viene richiamata l'`PutMetricData`azione.

1. Selezionare **Next Step (Fase successiva)**.

1. Modificare **Policy Name (Nome policy)** in `StockTradeStreamPolicy`, rivedere il codice e scegliere **Create Policy (Crea policy)**.

Il documento della policy risultante dovrebbe essere simile a quello seguente:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Per creare un utente IAM**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nella pagina **Users (Utenti)**, scegliere **Add user (Aggiungi utente)**.

1. Per **Nome utente**, digita `StockTradeStreamUser`.

1. Per **Access type (Tipo di accesso)**, scegliere **Programmatic access (Accesso programmatico)**, quindi selezionare **Next: Permissions (Successivo: autorizzazioni)**.

1. Scegli **Attach existing policies directly (Collega direttamente le policy esistenti)**.

1. Cercare la policy creata per nome. Selezionare la casella a sinistra del nome della policy, quindi scegliere **Next: Review (Successivo: revisione)**.

1. Rivedere i dettagli e il riepilogo, quindi scegliere **Create user (Crea utente)**.

1. Copiare l'**ID chiave di accesso** e salvarlo privatamente. In **Secret access key (Chiave di accesso segreta)**, scegliere **Show (Mostra)** e salvare anche la chiave privatamente.

1. Incollare la chiave segreta e quella di accesso su un file locale in un luogo sicuro non accessibile ad altri. Per questa applicazione, è necessario creare un file denominato ` ~/.aws/credentials` (con autorizzazioni rigorose). Il file deve avere il formato seguente:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Collegamento di una policy IAM a un utente**

1. Nella console IAM, apri [Policy](https://console.aws.amazon.com/iam/home?#policies) e scegli **Operazioni di policy**. 

1. Scegliere `StockTradeStreamPolicy` e **Attach (Collega)**.

1. Scegliere `StockTradeStreamUser` e **Attach Policy (Collega policy)**.

## Fasi successive
<a name="tutorial-stock-data-kplkcl-iam-next"></a>

[Scarica e crea il codice di implementazione](tutorial-stock-data-kplkcl-download.md)

# Scarica e crea il codice di implementazione
<a name="tutorial-stock-data-kplkcl-download"></a>

Viene fornito codice di base per il [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md). Include l'implementazione di uno stub sia per l'acquisizione dei flussi delle negoziazioni (*producer*) che per l'elaborazione dei dati (*consumer*). La procedura seguente mostra come completare l'implementazione. 

**Per scaricare e compilare il codice di implementazione**

1. Scaricare il [codice sorgente](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1) sul computer.

1. Creare un progetto nell'IDE di preferenza con il codice sorgente, rispettando la struttura di directory fornita.

1. Aggiungere le seguenti librerie al progetto:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. A seconda dell'IDE, il progetto potrebbe essere creato automaticamente. In caso contrario, creare il progetto seguendo la procedura appropriata per l'IDE utilizzato.

Se la procedura viene completata correttamente, puoi passare alla sezione successiva, [Implementa il produttore](tutorial-stock-data-kplkcl-producer.md). Se la build genera errori in una fase, analizzali e correggili prima di continuare.

## Fasi successive
<a name="tutorial-stock-data-kplkcl-download-next"></a>

[[Implementa il produttore](tutorial-stock-data-kplkcl-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl-producer.md)

# Implementa il produttore
<a name="tutorial-stock-data-kplkcl-producer"></a>



L'applicazione nel [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x[Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) utilizza come scenario quello del monitoraggio del mercato azionario reale. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

**StockTrade classe**  
Una singola negoziazione è rappresentata da un'istanza della classe `StockTrade`. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te.

**Record di flusso**  
Un flusso è una sequenza di record. Un record è una serializzazione di un'istanza `StockTrade` in formato JSON. Esempio:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator classe**  
`StockTradeGenerator` include un metodo denominato `getRandomTrade()` che restituisce una negoziazione generata casualmente ogni volta che viene invocata. Questa classe è implementata per te.

**StockTradesWriter classe**  
Il metodo `main` del producer, `StockTradesWriter` recupera continuamente uno scambio casuale e lo invia al flusso di dati Kinesis eseguendo queste operazioni:  

1. Legge il nome del flusso e il nome della regione come input.

1. Crea un `AmazonKinesisClientBuilder`.

1. Utilizza il generatore client per impostare la regione, le credenziali e la configurazione del client.

1. Crea un client `AmazonKinesis` utilizzando il generatore di client.

1. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore).

1. In un ciclo continuo, chiama il metodo `StockTradeGenerator.getRandomTrade()` e quindi il metodo `sendStockTrade` per inviare lo scambio al flusso ogni 100 millisecondi.
Il metodo `sendStockTrade` della classe `StockTradesWriter` include il codice seguente:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Fai riferimento alla seguente suddivisione del codice:
+ L'`PutRecord`API prevede un array di byte e devi convertirlo in formato `trade` JSON. Questa singola riga di codice esegue tale operazione:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Prima di poter inviare lo scambio, devi creare una nuova istanza `PutRecordRequest` (denominata `putRecord` in questo caso):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Ogni chiamata `PutRecord` richiede il nome del flusso, la chiave di partizione e il blob di dati. Il codice seguente compila questi campi nell'oggetto `putRecord` utilizzando i relativi metodi `setXxxx()`:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  Questo esempio utilizza un ticket per titoli come chiave di partizione, che mappa il record a un determinato shard. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta [Aggiungi dati a uno stream](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  Ora `putRecord` è pronto per l'invio al client (operazione `put`):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Aggiungi il try/catch blocco attorno all'operazione: `put`

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  Questo è perché un'operazione `put` del flusso di dati Kinesis potrebbe non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Ti consigliamo di considerare attentamente la politica di riprova per `put` le operazioni volte a evitare la perdita di dati, ad esempio utilizzando un nuovo tentativo. 
+ La registrazione dello stato è utile, ma opzionale:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Il producer mostrato qui utilizza la funzionalità record singolo dell'API del flusso di dati Kinesis, `PutRecord`. In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di `PutRecords` e inviare batch di record ogni volta. Per ulteriori informazioni, consulta [Aggiungi dati a uno stream](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**Per eseguire il producer**

1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradeWriter` con i seguenti argomenti:

   ```
   StockTradeStream us-west-2
   ```

   Se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.

## Fasi successive
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[Implementa il consumatore](tutorial-stock-data-kplkcl-consumer.md)

# Implementa il consumatore
<a name="tutorial-stock-data-kplkcl-consumer"></a>

L'applicazione consumer nel [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x[Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) elabora in modo continuo il flusso di negoziazioni creato in [[Implementa il produttore](tutorial-stock-data-kplkcl-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl-producer.md). Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che esegue numerose delle attività impegnative comuni alle applicazioni consumer. Per ulteriori informazioni, consulta [Sviluppa i consumatori di KCL 1.x](developing-consumers-with-kcl.md). 

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

**StockTradesProcessor classe**  
Classe principale del consumer, fornita per te, che esegue queste attività:  
+ Legge il nome dell'applicazione, del flusso e della regione, passati come argomenti.
+ Legge le credenziali da `~/.aws/credentials`.
+ Crea un'istanza `RecordProcessorFactory` che serve istanze di `RecordProcessor`, implementate da un'istanza `StockTradeRecordProcessor`.
+ Crea un worker KCL con l'istanza `RecordProcessorFactory` e una configurazione standard, inclusi il nome del flusso, le credenziali e il nome dell'applicazione. 
+ Il worker crea un nuovo thread per ciascuna partizione (assegnata a questa istanza consumer), che in un ciclo continuo legge i record dal flusso di dati Kinesis. Quindi invoca l'istanza `RecordProcessor` per elaborare ogni batch di record ricevuto.

**StockTradeRecordProcessor classe**  
Implementazione dell'istanza `RecordProcessor`, che a sua volta implementa tre metodi richiesti: `initialize`, `processRecords` e `shutdown`.  
Come suggeriscono i nomi, `initialize` e `shutdown` vengono utilizzati dalla Kinesis Client Library per consentire all'elaboratore di record di sapere quando dovrebbe essere pronto a iniziare a ricevere record e quando dovrebbe aspettarsi di non ricevere più record, in modo da poter effettuare qualsiasi attività di configurazione e cessazione specifica per l'applicazione. Il codice relativo viene fornito per te. L'elaborazione principale si verifica nel metodo `processRecords`, che a sua volta utilizza `processRecord` per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato ulteriormente.  
Da segnalare è anche l'implementazione dei metodi di supporto per `processRecord`, ovvero `reportStats` e `resetStats`, che sono vuoti nel codice sorgente originale.  
Il metodo `processRecords` viene implementato per te ed esegue questa procedura:  
+  Per ogni record passato, chiama `processRecord` su di esso.
+ Se è trascorso almeno 1 minuto dall'ultimo report, chiama `reportStats()`, che consente di stampare le statistiche più recenti, seguito da `resetStats()`, che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record.
+ Imposta l'orario della creazione di report successiva.
+ Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama `checkpoint()`. 
+ Imposta l'orario della creazione di checkpoint successiva.
Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sulla creazione di checkpoint, consulta [Informazioni aggiuntive sul consumatore](#tutorial-stock-data-kplkcl-consumer-supplement).

**StockStats classe**  
Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:  
+ `addStockTrade(StockTrade)`: inserisce un dato `StockTrade` nelle statistiche in esecuzione.
+ `toString()`: restituisce le statistiche in una stringa formattata.
Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di scambi per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.

Aggiungi codice ai metodi della classe `StockTradeRecordProcessor`, come mostrato nella procedura seguente.

**Per implementare il consumer**

1. Implementare il metodo `processRecord` creando un'istanza di un oggetto `StockTrade` delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema.

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. Implementare un metodo `reportStats` semplice. Modificare liberamente il formato di output in base alle proprie preferenze.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. Infine, implementare il metodo `resetStats`, che crea una nuova istanza `stockStats`.

   ```
   stockStats = new StockStats();
   ```

**Per eseguire il consumer**

1. Eseguire il producer scritto in [[Implementa il produttore](tutorial-stock-data-kplkcl-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl-producer.md) per inserire record di scambi simulati nel flusso.

1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradesProcessor` con i seguenti argomenti:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Nota: se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Informazioni aggiuntive sul consumatore
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

Se hai familiarità con i vantaggi della Kinesis Client Library discussi in [Sviluppa i consumatori di KCL 1.x](developing-consumers-with-kcl.md) e altrove, potresti chiederti perché utilizzarla qui. Sebbene sia possibile utilizzare solo un singolo flusso di partizioni e una singola istanza consumer per elaborarlo, è comunque più semplice implementare il consumer utilizzando la KCL. Confronta la procedura di implementazione di codice nella sezione del producer con quella del consumer e potrai avere un'idea della maggiore semplicità di implementazione di un consumer. Ciò è principalmente dovuto ai servizi forniti dalla KCL.

In questa applicazione, devi concentrarti sull'implementazione di una classe di processore di record in grado di elaborare singoli record. Non devi preoccuparti di come i record vengono recuperati dal flusso di dati Kinesis; la KCL recupera i record e invoca il processore di record ogni volta che non sono presenti nuovi record disponibili. Inoltre, non devi preoccuparti nemmeno di quante istanze shard e consumer sono presenti. Se il flusso è stato incrementato, non è necessario riscrivere l'applicazione per gestire più di un'istanza shard o consumer.

Il termine *checkpoint* significa registrare il punto del flusso fino ai record di dati che sono stati utilizzati ed elaborati finora. Se l'applicazione si blocca, lo stream viene letto da quel punto e non dall'inizio dello stream. Il soggetto della creazione di checkpoint e i relativi modelli di progettazione e best practice non verranno trattati in questo capitolo. Tuttavia, è qualcosa che potresti incontrare negli ambienti di produzione.

Come illustrato in [[Implementa il produttore](tutorial-stock-data-kplkcl-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl-producer.md), le operazioni `put` nell'API del flusso di dati Kinesis richiedono una *chiave di partizione* come input. Il flusso di dati Kinesis utilizza una chiave di partizione come meccanismo per dividere i record tra più partizioni (quando è presente più di una partizione nel flusso). La stessa chiave di partizione esegue l'instradamento sempre allo stesso shard. In questo modo, il consumer che elabora un determinato shard può essere progettato presupponendo che i record con la stessa chiave di partizione vengano inviati solo a quel consumer e che nessun record con la stessa chiave di partizione venga inviato a un altro consumer. Pertanto, il ruolo di lavoro di un consumer può aggregare tutti i record con la stessa chiave di partizione senza trascurare dati necessari.

In questa applicazione, l'elaborazione di record da parte del consumer non è un processo intensivo, per cui puoi utilizzare una partizione ed eseguire l'elaborazione nello stesso thread della KCL. Tuttavia, prima valuta la possibilità di incrementare il numero di shard. In alcuni casi, potresti voler trasferire l'elaborazione a un altro thread oppure utilizzare un pool di thread se prevedi che il processo di elaborazione dei record sia intensivo. In questo modo, la KCL è in grado di recuperare i nuovi record più rapidamente, mentre gli altri thread possono elaborare i record in parallelo. La progettazione multithread non è banale e dovrebbe essere affrontata con tecniche avanzate, quindi aumentare il numero di shard è di solito il modo più efficace per scalare.

## Fasi successive
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[(Facoltativo) Estendi il numero di utenti](tutorial-stock-data-kplkcl-consumer-extension.md)

# (Facoltativo) Estendi il numero di utenti
<a name="tutorial-stock-data-kplkcl-consumer-extension"></a>

L'applicazione in [Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x[Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) potrebbe già essere sufficiente per i tuoi scopi. Questa sezione opzionale mostra in che modo puoi ampliare la funzionalità del codice consumer per uno scenario leggermente più elaborato.

Se desideri conoscere i maggiori ordini di vendita ogni minuto, puoi modificare la classe `StockStats` in tre punti per supportare questa nuova priorità.

**Per ampliare la funzionalità del consumer**

1. Aggiungere nuove variabili di istanza:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Aggiungere il seguente codice a `addStockTrade`:

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Modificare il metodo `toString` per stampare le informazioni aggiuntive:

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

Se si esegue il consumer ora (ricordare di eseguire anche il producer), dovrebbe essere visualizzato un output simile a questo:

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Fasi successive
<a name="tutorial-stock-data-kplkcl-consumer-extension-next"></a>

[Eseguire la pulizia delle risorse](tutorial-stock-data-kplkcl-finish.md)

# Eseguire la pulizia delle risorse
<a name="tutorial-stock-data-kplkcl-finish"></a>

Poiché l'utilizzo del flusso di dati Kinesis è a pagamento, una volta terminato di usarlo assicurati di eliminarlo insieme alle tabelle Amazon DynamoDB corrispondenti. Vengono infatti applicati costi nominali a un flusso attivo, anche quando non invii o ricevi record. Ciò si verifica perché un flusso attivo utilizza risorse monitorando in modo continuo i record in entrata e le richieste per ottenere record.

**Per eliminare il flusso e la tabella**

1. Chiudere i producer e i consumer ancora in esecuzione.

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Scegliere il flusso creato per questa applicazione (`StockTradeStream`).

1. Scegliere **Delete Stream (Elimina flusso)**.

1. Apri la console DynamoDB all'indirizzo. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Eliminare la tabella `StockTradesProcessor`.

## Riepilogo
<a name="tutorial-stock-data-kplkcl-summary"></a>

L'elaborazione di una grande quantità di dati quasi in tempo reale non richiede la scrittura di codice complicato o lo sviluppo di un'infrastruttura enorme. Elaborare una piccola quantità di dati (come la scrittura`processRecord(Record)`) è tanto semplice quanto scrivere la logica, ma usare Kinesis Data Streams per scalare in modo da funzionare per una grande quantità di dati in streaming. Non devi preoccuparti del dimensionamento dell'elaborazione, perché il flusso di dati Kinesis lo gestisce per conto tuo. Devi soltanto inviare i tuoi record di streaming al flusso di dati Kinesis e scrivere la logica per elaborare ogni nuovo record ricevuto. 

Di seguito sono elencati alcuni miglioramenti potenziali per questa applicazione.

**Aggregazione tra tutti gli shard**  
Al momento, puoi ottenere statistiche derivate dall'aggregazione dei record di dati ricevuti da un singolo ruolo di lavoro e provenienti da un solo shard (uno shard non può essere elaborato da più di un ruolo di lavoro in un'unica applicazione allo stesso tempo). Quando esegui il dimensionamento e disponi di più di uno shard, potresti voler effettuare l'aggregazione tra tutti gli shard. Per farlo, ti occorre un'architettura pipeline in cui l'output di ogni ruolo di lavoro viene utilizzato in un altro flusso con un singolo shard, che viene elaborato da un ruolo di lavoro che aggrega gli output della prima fase. Poiché i dati della prima fase sono limitati (un campione al minuto per shard), possono essere gestiti facilmente da uno shard.

**Dimensionamento dell'elaborazione**  
Quando il flusso viene incrementato per disporre di numerosi shard (dal momento che molti producer inviano dati), per dimensionare l'elaborazione è possibile aggiungere ulteriori ruoli di lavoro. Puoi eseguire i worker nelle istanze Amazon EC2 e utilizzare gruppi con dimensionamento automatico.

**Usa i connettori per Amazon S3/ DynamoDB/Amazon Redshift/Storm**  
Poiché uno stream viene elaborato continuamente, il relativo output può essere inviato ad altre destinazioni. AWS fornisce [connettori](https://github.com/awslabs/amazon-kinesis-connectors) per integrare Kinesis Data Streams AWS con altri servizi e strumenti di terze parti.

## Fasi successive
<a name="tutorial-stock-data-kplkcl-next-steps"></a>
+ Per ulteriori informazioni sull'utilizzo delle operazioni API del flusso di dati Kinesis, consulta [Sviluppa i produttori utilizzando l'API Amazon Kinesis Data Streams con AWS SDK per Java](developing-producers-with-sdk.md), [Sviluppa consumatori con produttività condivisa con AWS SDK per Java](developing-consumers-with-sdk.md) e [Crea e gestisci flussi di dati Kinesis](working-with-streams.md).
+ Per ulteriori informazioni sulla Kinesis Client Library, consulta [Sviluppa i consumatori di KCL 1.x](developing-consumers-with-kcl.md). 
+ Per ulteriori informazioni su come ottimizzare l'applicazione, consulta [Ottimizza i consumatori di Amazon Kinesis Data StreamsOttimizzazione dei consumatori di Kinesis Data Streams](advanced-consumers.md). 

# Tutorial: analizza i dati di borsa in tempo reale utilizzando Amazon Managed Service per Apache Flink
<a name="tutorial-stock-data"></a>

Lo scenario per questo tutorial richiede l'importazione del flusso di negoziazioni in un flusso di dati e la scrittura di una semplice applicazione del [Servizio gestito da Amazon per Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html) che esegua calcoli sul flusso. Imparerai come inviare un flusso di record a Kinesis Data Streams e implementare un'applicazione che consuma ed elabora i record quasi in tempo reale.

Con Amazon Managed Service for Apache Flink, puoi usare Java o Scala per elaborare e analizzare i dati di streaming. Il servizio consente di creare ed eseguire codice Java o Scala su sorgenti di streaming per eseguire analisi di serie temporali, alimentare dashboard in tempo reale e creare metriche in tempo reale.

È possibile creare applicazioni Flink nel servizio gestito per Apache Flink utilizzando le librerie open source basate su [Apache Flink](https://flink.apache.org/). Apache Flink è un celebre framework e motore per l’elaborazione di flussi di dati. 

**Importante**  
Dopo aver creato due flussi di dati e un'applicazione, all'account vengono addebitati costi nominali per l'utilizzo di Kinesis Data Streams e Managed Service for Apache Flink perché non sono idonei per il piano gratuito. AWS Quando hai finito con questa applicazione, elimina le tue risorse per evitare di incorrere in addebiti. AWS 

Il codice non accede ai dati del mercato azionario, ma simula il flusso delle negoziazioni. A tale scopo, utilizza un generatore di negoziazioni casuale. Se disponi dell'accesso a un flusso di negoziazioni in tempo reale, potresti essere interessato a derivare statistiche utili e tempestive da quel flusso. Ad esempio, potresti eseguire un'analisi basata su finestra scorrevole per determinare i titoli più acquistati negli ultimi 5 minuti. Oppure potresti ricevere una notifica ogni volta che viene effettuato un ordine di vendita troppo grande (ossia, che include un numero eccessivo di titoli). Puoi estendere il codice in questa serie per fornire tale funzionalità.

Gli esempi mostrati utilizzano la regione Stati Uniti occidentali (Oregon), ma si applicano a qualunque [Regione AWS che supporta il servizio gestito per Apache Flink](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region).

**Topics**
+ [

## Prerequisiti per il completamento degli esercizi
](#setting-up-prerequisites)
+ [

# Configura un AWS account e crea un utente amministratore
](setting-up.md)
+ [

# Configura il AWS Command Line Interface (AWS CLI)
](setup-awscli.md)
+ [

# Creare ed eseguire un servizio gestito per l'applicazione Apache Flink
](get-started-exercise.md)

## Prerequisiti per il completamento degli esercizi
<a name="setting-up-prerequisites"></a>

Per completare le fasi in questa guida, è richiesto quanto segue:
+ [Java Development Kit (JDK) versione 8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html). Imposta la variabile di ambiente `JAVA_HOME` in modo che punti alla posizione di installazione di JDK.
+ Ti consigliamo di utilizzare un ambiente di sviluppo (ad esempio [Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) o [IntelliJ IDEA](https://www.jetbrains.com/idea/)) per sviluppare e compilare l'applicazione.
+ [Client Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). Installa il client Git se non lo hai già fatto.
+ [Apache Maven Compiler Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/). Maven deve trovarsi nel percorso di lavoro. Per testare l'installazione Apache Maven, immetti quanto segue:

  ```
  $ mvn -version
  ```

Per iniziare, vai alla pagina [Configura un AWS account e crea un utente amministratore](setting-up.md).

# Configura un AWS account e crea un utente amministratore
<a name="setting-up"></a>

Prima di utilizzare Amazon Managed Service for Apache Flink per la prima volta, completa le seguenti attività:

1. [Registrati per AWS](#setting-up-signup)

1. [Creare un utente IAM](#setting-up-iam)

## Registrati per AWS
<a name="setting-up-signup"></a>

Quando ti registri ad Amazon Web Services (AWS), il tuo AWS account viene automaticamente registrato per tutti i servizi in AWS, incluso Amazon Managed Service per Apache Flink. Ti vengono addebitati solo i servizi che utilizzi.

Con il servizio gestito per Apache Flink vengono addebitati esclusivamente i costi per le risorse utilizzate. Se sei un nuovo cliente AWS , puoi iniziare a utilizzare il servizio gestito per Apache Flink gratuitamente. Per ulteriori informazioni, consulta [Piano gratuito AWS](https://aws.amazon.com/free/).

Se hai già un AWS account, passa all'attività successiva. Se non disponi di un account AWS , procedi nel modo seguente per crearne uno.

**Per creare un account AWS**

1. Apri la [https://portal.aws.amazon.com/billing/registrazione.](https://portal.aws.amazon.com/billing/signup)

1. Segui le istruzioni online.

   Nel corso della procedura di registrazione riceverai una telefonata o un messaggio di testo e ti verrà chiesto di inserire un codice di verifica attraverso la tastiera del telefono.

   Quando ti iscrivi a un Account AWS, *Utente root dell'account AWS*viene creato un. L’utente root dispone dell’accesso a tutte le risorse e tutti i Servizi AWS nell’account. Come best practice di sicurezza, assegna l’accesso amministrativo a un utente e utilizza solo l’utente root per eseguire [attività che richiedono l’accesso di un utente root](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks).

Annota l'ID del tuo AWS account perché ti servirà per l'attività successiva.

## Creare un utente IAM
<a name="setting-up-iam"></a>

I servizi in AWS, come Amazon Managed Service for Apache Flink, richiedono l'immissione di credenziali al momento dell'accesso. In questo modo, il servizio può stabilire se si dispone delle autorizzazioni per accedere alle risorse di proprietà del servizio stesso. Console di gestione AWS Richiede l'immissione della password. 

Puoi creare chiavi di accesso per il tuo AWS account per accedere a AWS Command Line Interface (AWS CLI) o all'API. Tuttavia, non ti consigliamo di accedere AWS utilizzando le credenziali del tuo AWS account. Ti consigliamo invece di utilizzare AWS Identity and Access Management (IAM). Crea un utente IAM, aggiungilo a un gruppo IAM con autorizzazioni amministrative, quindi concedi le autorizzazioni amministrative all'utente IAM creato. Potrai quindi accedere ad AWS utilizzando un URL speciale e le credenziali dell'utente IAM.

Se ti sei registrato AWS ma non hai ancora creato un utente IAM, puoi crearne uno utilizzando la console IAM.

Per eseguire gli esercizi delle Nozioni di base di questa guida è necessario disporre di un utente (`adminuser`) con autorizzazioni di amministratore. Per creare un `adminuser` nel tuo account, utilizza la procedura indicata di seguito.

**Per creare un gruppo di amministratori**

1. Accedi Console di gestione AWS e apri la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nel riquadro di navigazione, scegliere **Groups (Gruppi)**, quindi **Create New Group (Crea nuovo gruppo)**.

1. In **Group Name (Nome gruppo)**, digitare un nome per il gruppo, come **Administrators** , quindi scegliere **Next Step (Fase successiva)**.

1. Nell'elenco delle politiche, seleziona la casella di controllo accanto alla **AdministratorAccess**politica. È possibile utilizzare il menu **Filter (Filtro)** e la casella **Search (Cerca)** per filtrare l'elenco di policy.

1. Scegliere **Next Step (Fase successiva)**, quindi **Create Group (Crea gruppo)**.

Il nuovo gruppo viene elencato nell'area **Group Name (Nome gruppo)**.

**Creazione di un utente IAM per se stessi, aggiunta al gruppo Amministratori e creazione di una password**

1. Nel riquadro di navigazione, scegli **Users (Utenti)**, quindi scegli **Add user (Aggiungi utente)**.

1. Nella casella **User name (Nome utente) **, immettere un nome utente.

1. Scegli **Accesso programmatico** e **Accesso alla Console di gestione AWS **.

1. Scegli **Successivo: autorizzazioni**.

1. Selezionare la casella di controllo accanto al gruppo **Administrators (Amministratori)**. Scegli quindi **Next: Review** (Avanti: Verifica).

1. Selezionare **Create user (Crea utente)**.

**Per accedere come nuovo utente IAM**

1. Esci da Console di gestione AWS.

1. Per accedere alla console, utilizzare il seguente formato URL:

   `https://aws_account_number.signin.aws.amazon.com/console/`

   *aws\$1account\$1number*È l'ID AWS del tuo account senza trattini. Ad esempio, se l'ID del tuo AWS account è 1234-5678-9012, sostituiscilo con. *aws\$1account\$1number* **123456789012** *Per informazioni su come trovare il numero del tuo account, consulta l'[ID del tuo account e il suo alias nella Guida per l' AWS utente](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html) IAM.*

1. Immettere il nome utente e la password di IAM appena creati. Una volta effettuato l'accesso, la barra di navigazione mostra *your\$1user\$1name* @*your\$1aws\$1account\$1id*.

**Nota**  
Se non desideri che l'URL della pagina di accesso contenga l'ID AWS dell'account, puoi creare un alias per l'account.

**Per creare o rimuovere un alias dell'account**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nel riquadro di navigazione, selezionare **Dashboard (Pannello di controllo)**.

1. Individuare il link di accesso agli utenti IAM.

1. Per creare l'alias, scegliere **Customize (Personalizza)**. Immettere il nome che si desidera usare per l'alias, quindi selezionare **Yes, Create (Sì, crea)**.

1. Per rimuovere l'alias, scegliere **Customize (Personalizza)** e quindi **Yes, Delete (Sì, elimina)**. L'URL di accesso torna a utilizzare l'ID del tuo account. AWS 

Per effettuare l'accesso dopo aver creato un alias dell'account, utilizzare il seguente URL:

`https://your_account_alias.signin.aws.amazon.com/console/`

Per verificare il link di accesso degli utenti IAM al tuo account, apri la console IAM e controlla in **IAM users sign-in link (Link di accesso utenti IAM)** nel pannello di controllo.

Per ulteriori informazioni su IAM, consulta:
+ [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/)
+ [Guida introduttiva a IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [Guida per l'utente di IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## Approfondimenti
<a name="setting-up-next-step-2"></a>

[Configura il AWS Command Line Interface (AWS CLI)](setup-awscli.md)

# Configura il AWS Command Line Interface (AWS CLI)
<a name="setup-awscli"></a>

In questa fase, scarichi e configuri il file AWS CLI per utilizzarlo con Amazon Managed Service for Apache Flink.

**Nota**  
Gli esercizi sulle Nozioni di base di questa guida presuppongono che tu disponga di credenziali di amministratore (`adminuser`) nell'account per eseguire le operazioni.

**Nota**  
Se l'hai già AWS CLI installato, potrebbe essere necessario eseguire l'aggiornamento per ottenere le funzionalità più recenti. Per ulteriori informazioni, vedere [Installazione dell'interfaccia a riga di AWS comando](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) nella *Guida AWS Command Line Interface per l'utente*. Per verificare la versione di AWS CLI, esegui il seguente comando:  

```
aws --version
```
Gli esercizi di questo tutorial richiedono la seguente AWS CLI versione o successiva:  

```
aws-cli/1.16.63
```

**Per configurare il AWS CLI**

1. Scarica e configura la AWS CLI. Per le istruzioni, consulta i seguenti argomenti nella *Guida per l'utente dell'AWS Command Line Interface *: 
   + [Installazione dell' AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [Configurazione della AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. Aggiungi un profilo denominato per l'utente amministratore nel file di AWS CLI configurazione. Questo profilo viene utilizzato durante l'esecuzione dei AWS CLI comandi. Per ulteriori informazioni sui profili denominati, consulta [Profili denominati](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html) in *Guida per l'utente dell'AWS Command Line Interface *.

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   Per un elenco delle AWS regioni disponibili, consulta [AWS Regioni ed endpoint](https://docs.aws.amazon.com/general/latest/gr/rande.html) in. *Riferimenti generali di Amazon Web Services*

1. Verifica la configurazione digitando il comando help riportato di seguito al prompt dei comandi: 

   ```
   aws help
   ```

Dopo aver configurato un AWS account AWS CLI, puoi provare l'esercizio successivo, in cui configurerai un'applicazione di esempio e verificherai la end-to-end configurazione.

## Approfondimenti
<a name="setting-up-next-step-3"></a>

[Creare ed eseguire un servizio gestito per l'applicazione Apache Flink](get-started-exercise.md)

# Creare ed eseguire un servizio gestito per l'applicazione Apache Flink
<a name="get-started-exercise"></a>

In questo esercizio, viene creata un'applicazione del servizio gestito per Apache Flink con flussi di dati come origine e come sink.

**Topics**
+ [

## Crea due flussi di dati Amazon Kinesis
](#get-started-exercise-1)
+ [

## Scrittura di record di esempio nel flusso di input
](#get-started-exercise-2)
+ [

## Scarica ed esamina il codice Java per lo streaming di Apache Flink
](#get-started-exercise-5)
+ [

## Compilate il codice dell'applicazione
](#get-started-exercise-5.5)
+ [

## Caricate il codice Java di streaming Apache Flink
](#get-started-exercise-6)
+ [

## Crea ed esegui l'applicazione Managed Service for Apache Flink
](#get-started-exercise-7)

## Crea due flussi di dati Amazon Kinesis
<a name="get-started-exercise-1"></a>

Prima di creare un Amazon Managed Service per Apache Flink per questo esercizio, crea due flussi di dati Kinesis (e). `ExampleInputStream` `ExampleOutputStream` L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.

Puoi creare questi flussi utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta [Creazione e aggiornamento dei flussi di dati](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**Per creare i flussi di dati (AWS CLI)**

1. Per creare il primo stream (`ExampleInputStream`), usa il seguente comando Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Scrittura di record di esempio nel flusso di input
<a name="get-started-exercise-2"></a>

In questa sezione, viene utilizzato uno script Python per scrivere record di esempio nel flusso per l'applicazione da elaborare.

**Nota**  
Questa sezione richiede [AWS SDK per Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Crea un file denominato `stock.py` con i seguenti contenuti:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Successivamente nel tutorial, esegui lo script `stock.py` per inviare dati all'applicazione. 

   ```
   $ python stock.py
   ```

## Scarica ed esamina il codice Java per lo streaming di Apache Flink
<a name="get-started-exercise-5"></a>

Il codice dell'applicazione Java per questi esempi è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

1. Clona il repository remoto con il comando seguente:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Passa alla directory `GettingStarted`.

Il codice dell'applicazione si trova nei file `CustomSinkStreamingJob.java` e `CloudWatchLogSink.java`. Tieni presente quanto segue riguardo al codice dell'applicazione:
+ L'applicazione utilizza un'origine Kinesis per leggere dal flusso di origine. Il seguente snippet crea il sink Kinesis:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Compilate il codice dell'applicazione
<a name="get-started-exercise-5.5"></a>

In questa sezione, viene utilizzato il compilatore Apache Maven per creare il codice Java per l'applicazione. Per ulteriori informazioni sull'installazione di Apache Maven e Java Development Kit (JDK), consulta [Prerequisiti per il completamento degli esercizi](tutorial-stock-data.md#setting-up-prerequisites).

L'applicazione Java richiede i componenti seguenti:
+ Un file [Project Object Model (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html). Questo file contiene informazioni sulla configurazione e le dipendenze dell'applicazione, tra cui le librerie Amazon Managed Service per Apache Flink.
+ Un metodo `main` contenente la logica dell'applicazione.

**Nota**  
**Per utilizzare il connettore Kinesis per la seguente applicazione, è necessario scaricare il codice sorgente del connettore e crearlo come descritto nella documentazione di [Apache](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html) Flink.**

**Per creare e compilare il codice dell'applicazione**

1. Crea un' Java/Maven applicazione nel tuo ambiente di sviluppo. Per ulteriori informazioni su come creare un'applicazione, consulta la documentazione per l'ambiente di sviluppo:
   + [ Creazione del primo progetto Java (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ Creazione, esecuzione e compressione della prima applicazione Java (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Utilizzare il codice seguente per un file denominato `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Notare quanto segue riguardo l'esempio di codice precedente:
   + Questo file contiene il metodo `main` che definisce la funzionalità dell'applicazione.
   + L'applicazione crea connettori di origine e sink per accedere alle risorse esterne utilizzando un oggetto `StreamExecutionEnvironment`. 
   + L'applicazione crea connettori di origine e sink utilizzando proprietà statiche. Per usare proprietà dell'applicazione dinamiche, utilizza i metodi `createSourceFromApplicationProperties` e `createSinkFromApplicationProperties` per creare i connettori. Questi metodi leggono le proprietà dell'applicazione per configurare il connettori.

1. Per usare il codice dell'applicazione, compila il codice e comprimilo in un file JAR. È possibile compilare e creare un pacchetto del codice in uno di due modi:
   + Utilizzare lo strumento Maven della riga di comando. Crea il file JAR eseguendo il comando seguente nella directory che contiene il file `pom.xml`:

     ```
     mvn package
     ```
   + Utilizza il tuo ambiente di sviluppo. Per informazioni dettagliate, consulta la documentazione relativa all'ambiente di sviluppo.

   È possibile caricare il pacchetto come un file JAR, oppure comprimere il pacchetto e caricarlo come un file ZIP. Se create l'applicazione utilizzando AWS CLI, specificate il tipo di contenuto del codice (JAR o ZIP).

1. Se si verificano errori durante la compilazione, verifica che la variabile di ambiente `JAVA_HOME` sia impostata correttamente.

Se l'applicazione viene compilata correttamente, viene creato il seguente file:

`target/java-getting-started-1.0.jar`

## Caricate il codice Java di streaming Apache Flink
<a name="get-started-exercise-6"></a>

In questa sezione, viene creato un bucket Amazon Simple Storage Service (Amazon S3) e caricato il codice dell'applicazione.

**Per caricare il codice dell'applicazione**

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Seleziona **Crea bucket**.

1. Inserisci **ka-app-code-*<username>*** nel campo **Nome bucket**. Aggiungi un suffisso al nome del bucket, ad esempio il nome utente, per renderlo globalmente univoco. Scegli **Next (Successivo)**.

1. Nella fase **Configura opzioni**, non modificare le impostazioni e scegli **Successivo**.

1. Nella fase **Imposta autorizzazioni**, non modificare le impostazioni e scegli **Successivo**.

1. Seleziona **Crea bucket**.

1. **Nella console Amazon S3, scegli il *<username>* bucket **ka-app-code-** e scegli Carica.**

1. Nella fase **Seleziona file**, scegli **Aggiungi file**. Individua il file `java-getting-started-1.0.jar` creato nella fase precedente. Scegli **Next (Successivo)**.

1. Nella fase **Imposta autorizzazioni**, non modificare le impostazioni. Scegli **Next (Successivo)**.

1. Nella fase **Imposta proprietà**, non modificare le impostazioni. Scegli **Carica**.

Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.

## Crea ed esegui l'applicazione Managed Service for Apache Flink
<a name="get-started-exercise-7"></a>

È possibile creare ed eseguire un'applicazione del servizio gestito per Apache Flink utilizzando la console o la AWS CLI.

**Nota**  
Quando crei l'applicazione utilizzando la console, le tue risorse AWS Identity and Access Management (IAM) e Amazon CloudWatch Logs vengono create automaticamente. Quando crei l'applicazione utilizzando AWS CLI, crei queste risorse separatamente.

**Topics**
+ [

### Crea ed esegui l'applicazione (Console)
](#get-started-exercise-7-console)
+ [

### Crea ed esegui l'applicazione (AWS CLI)
](#get-started-exercise-7-cli)

### Crea ed esegui l'applicazione (Console)
<a name="get-started-exercise-7-console"></a>

Segui questi passaggi per creare, configurare, aggiornare ed eseguire l'applicazione utilizzando la console.

#### Creazione dell’applicazione
<a name="get-started-exercise-7-console-create"></a>

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Sul pannello di controllo di Amazon Kinesis, scegli **Crea applicazione di analisi**.

1. Nella pagina **Kinesis Analytics - Create application (Kinesis Analytics - Crea applicazione)**, fornire i dettagli dell'applicazione come segue:
   + Per **Nome applicazione**, immetti **MyApplication**.
   + Per **Descrizione**, inserisci **My java test app**.
   + Per **Runtime**, scegliere **Apache Flink 1.6**.

1. Per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-west-2` per il ruolo IAM**.

1. Scegli **Crea applicazione**.

**Nota**  
Quando crei un'applicazione Amazon Managed Service for Apache Flink utilizzando la console, hai la possibilità di creare un ruolo e una policy IAM per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:  
Policy: `kinesis-analytics-service-MyApplication-us-west-2`
Ruolo: `kinesis-analytics-MyApplication-us-west-2`

#### Modifica la policy IAM
<a name="get-started-exercise-7-console-iam"></a>

Modifica la policy IAM per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Seleziona **Policy**. Scegli la policy **`kinesis-analytics-service-MyApplication-us-west-2`** creata dalla console nella sezione precedente. 

1. Nella pagina **Riepilogo**, scegli **Modifica policy**. Scegli la scheda **JSON**.

1. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (*012345678901*) con l'ID del tuo account.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Configura l'applicazione
<a name="get-started-exercise-7-console-configure"></a>

1. Nella **MyApplication**pagina, scegli **Configura**.

1. Nella pagina **Configura applicazione**, fornisci la **Posizione del codice**:
   + Per **Bucket Amazon S3**, inserisci **ka-app-code-*<username>***.
   + Per **Percorso dell'oggetto Amazon S3**, inserisci **java-getting-started-1.0.jar**

1. In **Accesso alle risorse dell'applicazione**, per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-west-2` per il ruolo IAM**.

1. In **Proprietà**, per **ID gruppo**, inserisci **ProducerConfigProperties**.

1. Immetti i valori e le proprietà dell'applicazione seguenti:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/get-started-exercise.html)

1. In **Monitoraggio**, accertati che il **Monitoraggio del livello dei parametri** sia impostato su **Applicazione**.

1. Per la **CloudWatch registrazione**, seleziona la casella di controllo **Abilita**.

1. Scegliere **Aggiorna**.

**Nota**  
Quando scegli di abilitare la CloudWatch registrazione, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:   
Gruppo di log: `/aws/kinesis-analytics/MyApplication`
Flusso di log: `kinesis-analytics-log-stream`

#### Esecuzione dell'applicazione.
<a name="get-started-exercise-7-console-run"></a>

1. **Nella **MyApplication**pagina, scegli Esegui.** Conferma l'operazione.

1. Quando l'applicazione è in esecuzione, aggiorna la pagina. La console mostra il **Grafico dell'applicazione**.

#### Arresta l'applicazione
<a name="get-started-exercise-7-console-stop"></a>

Nella **MyApplication**pagina, scegli **Stop**. Conferma l'operazione.

#### Aggiornamento dell'applicazione
<a name="get-started-exercise-7-console-update"></a>

Tramite la console, puoi aggiornare le impostazioni dell'applicazione, ad esempio le proprietà dell'applicazione, le impostazioni di monitoraggio e la posizione o il nome di file del JAR dell'applicazione. Puoi anche ricaricare il JAR dell'applicazione dal bucket Amazon S3 se è necessario aggiornare il codice dell'applicazione.

Nella **MyApplication**pagina, scegli **Configura**. Aggiorna le impostazioni dell'applicazione e scegli **Aggiorna**.

### Crea ed esegui l'applicazione (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

In questa sezione, si utilizza AWS CLI per creare ed eseguire l'applicazione Managed Service for Apache Flink. Managed Service for Apache Flink utilizza il `kinesisanalyticsv2` AWS CLI comando per creare e interagire con le applicazioni Managed Service for Apache Flink.

#### Creazione di una policy di autorizzazione
<a name="get-started-exercise-7-cli-policy"></a>

Innanzitutto, crea una policy di autorizzazione con due istruzioni: una che concede le autorizzazioni per l'operazione `read` sul flusso di origine e un'altra che concede le autorizzazioni per operazioni `write` sul flusso di sink. Collega quindi la policy a un ruolo IAM (che verrà creato nella sezione successiva). Pertanto, quando il servizio gestito per Apache Flink assume il ruolo, il servizio disporrà delle autorizzazioni necessarie per leggere dal flusso di origine e scrivere nel flusso di sink.

Utilizza il codice seguente per creare la policy di autorizzazione `KAReadSourceStreamWriteSinkStream`. Sostituisci `username` con il nome utente utilizzato per creare il bucket Amazon S3 per archiviare il codice dell'applicazione. Sostituisci l'ID dell'account in Amazon Resource Names (ARNs) (`012345678901`) con l'ID del tuo account.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

Per step-by-step istruzioni su come creare una politica di autorizzazioni, consulta il [Tutorial: Create and Attach Your First Customer Managed Policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) nella *IAM User Guide*.

**Nota**  
Per accedere ad altri AWS servizi, puoi utilizzare il AWS SDK per Java. Il servizio gestito per Apache Flink imposta automaticamente le credenziali richieste dall'SDK su quelle del ruolo IAM di esecuzione del servizio associato all'applicazione. Non sono richieste fasi aggiuntive.

#### Creazione di un ruolo IAM
<a name="get-started-exercise-7-cli-role"></a>

In questa sezione, crei un ruolo IAM che Managed Service for Apache Flink può assumere per leggere un flusso di origine e scrivere nel flusso sink.

Il servizio gestito per Apache Flink non può accedere al tuo flusso senza autorizzazioni. Queste autorizzazioni possono essere assegnate con un ruolo IAM. Ad ogni ruolo IAM sono collegate due policy. La policy di attendibilità concede al servizio gestito per Apache Flink l'autorizzazione per assumere il ruolo e la policy di autorizzazione determina cosa può fare il servizio assumendo questo ruolo.

Collega la policy di autorizzazione creata nella sezione precedente a questo ruolo.

**Per creare un ruolo IAM**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nel riquadro di navigazione, seleziona **Ruoli**, quindi **Crea nuovo ruolo**.

1. In **Seleziona tipo di identità attendibile**, scegli **Servizio AWS **. In **Scegli il servizio che utilizzerà questo ruolo**, scegli **Kinesis**. In **Seleziona il tuo caso d'uso**, scegli **Analisi dei dati Kinesis**.

   Scegli **Successivo: Autorizzazioni**.

1. Nella pagina **Allega policy di autorizzazione**, seleziona **Successivo: esamina**. Collega le policy di autorizzazione dopo aver creato il ruolo.

1. Nella pagina **Crea ruolo**, immetti **KA-stream-rw-role** per **Nome ruolo**. Scegli **Crea ruolo**.

   È stato creato un nuovo ruolo IAM denominato `KA-stream-rw-role`. Successivamente, aggiorna le policy di attendibilità e di autorizzazione per il ruolo.

1. Collega la policy di autorizzazione al ruolo.
**Nota**  
Per questo esercizio, il servizio gestito per Apache Flink assume questo ruolo per la lettura di dati da un flusso di dati Kinesis (origine) e la scrittura dell'output in un altro flusso di dati Kinesis. Pertanto, devi collegare la policy creata nella fase precedente, [Creazione di una policy di autorizzazione](#get-started-exercise-7-cli-policy).

   1. Nella pagina **Riepilogo**, scegli la scheda **Autorizzazioni**.

   1. Scegliere **Collega policy**.

   1. Nella casella di ricerca, immetti **KAReadSourceStreamWriteSinkStream** (la policy creata nella sezione precedente).

   1. Scegli la policy **KAReadInputStreamWriteOutputStream** e seleziona **Collega policy**.

È stato creato il ruolo di esecuzione del servizio che l'applicazione utilizzerà per accedere alle risorse. Prendi nota dell'ARN del nuovo ruolo.

Per step-by-step istruzioni sulla creazione di un ruolo, consulta [Creating an IAM Role (Console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) nella *IAM User Guide*.

#### Creazione dell'applicazione del servizio gestito per Apache Flink
<a name="get-started-exercise-7-cli-create"></a>

1. Salvare il seguente codice JSON in un file denominato `create_request.json`. Sostituisci l'ARN del ruolo di esempio con l'ARN per il ruolo creato in precedenza. Sostituisci il suffisso dell'ARN del bucket (`username`) con il suffisso scelto nella sezione precedente. Sostituisci l'ID account di esempio (`012345678901`) nel ruolo di esecuzione del servizio con il tuo ID account.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Esegui l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) con la richiesta precedente per creare l'applicazione: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

L'applicazione è ora creata. Avvia l'applicazione nella fase successiva.

#### Avvio dell'applicazione
<a name="get-started-exercise-7-cli-start"></a>

In questa sezione, viene utilizzata l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) per avviare l'applicazione.

**Per avviare l'applicazione**

1. Salvare il seguente codice JSON in un file denominato `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Esegui l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) con la richiesta precedente per avviare l'applicazione:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

L'applicazione è ora in esecuzione. Puoi controllare i parametri del servizio gestito per Apache Flink sulla CloudWatch console Amazon per verificare che l'applicazione funzioni.

#### Interruzione dell'applicazione
<a name="get-started-exercise-7-cli-stop"></a>

In questa sezione, viene utilizzata l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) per interrompere l'applicazione.

**Per interrompere l'applicazione**

1. Salvare il seguente codice JSON in un file denominato `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Esegui l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) con la seguente richiesta di interrompere l'applicazione:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

L'applicazione è ora interrotta.

# Tutorial: utilizzo AWS Lambda con Amazon Kinesis Data Streams
<a name="tutorial-stock-data-lambda"></a>

In questa esercitazione, è possibile creare una funzione Lambda per utilizzare gli eventi da un flusso di dati Kinesis. In questo scenario di esempio, un'applicazione personalizzata scrive i record in un flusso di dati Kinesis. AWS Lambda quindi esegue il polling di questo flusso di dati e, quando rileva nuovi record di dati, richiama la funzione Lambda. AWS Lambda esegue quindi la funzione Lambda assumendo il ruolo di esecuzione specificato al momento della creazione della funzione Lambda.

Per istruzioni dettagliate, consulta [Tutorial: Using AWS Lambda with Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html). 

**Nota**  
Questo tutorial presuppone che tu abbia una certa conoscenza delle operazioni di base di Lambda e AWS Lambda della console. Se non l'hai già fatto, segui le istruzioni in [Getting Started with AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) per creare la tua prima funzione Lambda.

# Usa la soluzione di AWS streaming di dati per Amazon Kinesis
<a name="examples-streaming-solution"></a>

La soluzione AWS Streaming Data per Amazon Kinesis configura automaticamente i AWS servizi necessari per acquisire, archiviare, elaborare e distribuire dati in streaming con facilità. La soluzione offre diverse opzioni per risolvere i casi d'uso dei dati di streaming che utilizzano più AWS servizi, tra cui Kinesis Data AWS Lambda Streams, Amazon API Gateway e Amazon Managed Service for Apache Flink. 

Ogni soluzione include i seguenti componenti:
+ Un CloudFormation pacchetto per distribuire l'esempio completo.
+ Una CloudWatch dashboard per la visualizzazione delle metriche delle applicazioni.
+ CloudWatch allarmi sulle metriche applicative più rilevanti.
+ tutti i ruoli e le politiche IAM necessari.

La soluzione è disponibile qui: [Soluzione di streaming di dati per Amazon Kinesis](https://aws.amazon.com/solutions/implementations/aws-streaming-data-solution-for-amazon-kinesis/)