

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Scrittura di dati su Amazon Kinesis Data Streams
<a name="building-producers"></a>

Un *producer* è un'applicazione che scrive dati su Flusso di dati Amazon Kinesis. È possibile creare produttori per Kinesis Data Streams AWS SDK per Java utilizzando e la Kinesis Producer Library (KPL).

Se sei nuovo per il flusso di dati Kinesis, ti consigliamo di familiarizzare prima con i concetti chiave e la terminologia introdotti in [Cos'è Amazon Kinesis Data Streams?](introduction.md) e [Utilizza il AWS CLI per eseguire operazioni di Amazon Kinesis Data Streams](getting-started.md).

**Importante**  
Il flusso di dati Kinesis supporta le modifiche al periodo di conservazione dei record di dati del tuo flusso di dati. Per ulteriori informazioni, consulta [Modifica il periodo di conservazione dei dati](kinesis-extended-retention.md).

Per inserire dati nel flusso, devi specificare il nome del flusso, una chiave di partizione e il blob di dati da aggiungere al flusso. La chiave di partizione viene utilizzata per determinare a quale shard nel flusso viene aggiunto il record di dati.

Tutti i dati nello shard vengono inviati allo stesso lavoratore che elabora lo shard. Quale chiave di partizione utilizzi dipende dalla logica dell'applicazione. Il numero di chiavi di partizione normalmente deve essere molto superiore al numero di shard. Questo perché la chiave di partizione viene utilizzata per determinare come mappare un record di dati a un determinato shard. Se hai un numero sufficiente di chiavi di partizione, i dati possono essere distribuiti in modo uniforme negli shard in un flusso.

**Topics**
+ [Sviluppa produttori utilizzando Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md)
+ [Sviluppa i produttori utilizzando l'API Amazon Kinesis Data Streams con AWS SDK per Java](developing-producers-with-sdk.md)
+ [Scrivi su Amazon Kinesis Data Streams utilizzando Kinesis Agent](writing-with-agents.md)
+ [Scrivi su Kinesis Data AWS Streams utilizzando altri servizi](using-other-services.md)
+ [Scrivi su Kinesis Data Streams utilizzando integrazioni di terze parti](using-other-services-third-party.md)
+ [Risolvi i problemi dei produttori di Amazon Kinesis Data Streams](troubleshooting-producers.md)
+ [Ottimizza i produttori di Kinesis Data Streams](advanced-producers.md)

# Sviluppa produttori utilizzando Amazon Kinesis Producer Library (KPL)
<a name="developing-producers-with-kpl"></a>

Un producer del flusso di dati Amazon Kinesis è qualsiasi applicazione che inserisce record di dati utente in un flusso di dati Kinesis (anche detto *importazione dei dati*). Amazon Kinesis Producer Library (KPL) semplifica lo sviluppo di applicazioni di produzione, permettendo agli sviluppatori di ottenere un throughput di scrittura elevato su un flusso di dati Kinesis. 

Puoi monitorare il KPL con Amazon CloudWatch. Per ulteriori informazioni, consulta [Monitora la libreria Kinesis Producer con Amazon CloudWatch](monitoring-with-kpl.md).

**Topics**
+ [Rivedi il ruolo del KPL](#developing-producers-with-kpl-role)
+ [Scopri i vantaggi dell'utilizzo di KPL](#developing-producers-with-kpl-advantage)
+ [Comprendi quando non usare KPL](#developing-producers-with-kpl-when)
+ [Installa KPL](kinesis-kpl-dl-install.md)
+ [Esegui la migrazione da KPL 0.x a KPL 1.x](kpl-migration-1x.md)
+ [Transizione ai certificati Amazon Trust Services (ATS) per la KPL](kinesis-kpl-upgrades.md)
+ [Piattaforme supportate da KPL](kinesis-kpl-supported-plats.md)
+ [Concetti chiave di KPL](kinesis-kpl-concepts.md)
+ [Integra KPL con il codice del produttore](kinesis-kpl-integration.md)
+ [Scrivi nel tuo flusso di dati Kinesis utilizzando KPL](kinesis-kpl-writing.md)
+ [Configurazione della libreria Amazon Kinesis Producer](kinesis-kpl-config.md)
+ [Implementa la deaggregazione dei consumatori](kinesis-kpl-consumer-deaggregation.md)
+ [Usa KPL con Amazon Data Firehose](kpl-with-firehose.md)
+ [Utilizzate il KPL con lo Schema Registry AWS Glue](kpl-with-schemaregistry.md)
+ [Configura la configurazione del proxy KPL](kpl-proxy-configuration.md)
+ [Politica del ciclo di vita della versione KPL](kpl-version-lifecycle-policy.md)

**Nota**  
Consigliamo di eseguire l'aggiornamento alla versione KPL più recente. La KCL viene regolarmente aggiornata con versioni più recenti che includono le ultime patch di dipendenza e sicurezza, correzioni di bug e nuove funzionalità retrocompatibili. Per maggiori informazioni, consulta [https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/).

## Rivedi il ruolo del KPL
<a name="developing-producers-with-kpl-role"></a>

KPL è una easy-to-use libreria altamente configurabile che ti aiuta a scrivere su un flusso di dati Kinesis. Funge da intermediario tra il codice dell'applicazione producer e le operazioni API del flusso di dati Kinesis. La KPL esegue le seguenti attività primarie: 
+ Scrive su uno o più flussi di dati Kinesis con un meccanismo di tentativi automatico e configurabile
+ Raccoglie record e utilizza `PutRecords` per scrivere più record in più shard per richiesta
+ Unisce i record degli utenti per aumentare la dimensione del payload e migliorare il throughput
+ Si integra perfettamente con la [Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) (KCL) per disaggregare i record raggruppati nel consumer
+ Invia i CloudWatch parametri di Amazon per tuo conto per fornire visibilità sulle prestazioni dei produttori

Tieni presente che KPL è diverso dall'API Kinesis Data Streams disponibile in. [AWS SDKs](https://aws.amazon.com/tools/) L'API del flusso di dati Kinesis consente di gestire molti aspetti del flusso di dati Kinesis (inclusa la creazione di flussi, il ripartizionamento e l'inserimento e l'estrazione di record), mentre la KPL fornisce un livello di astrazione specificamente per l'importazione dei dati. Per ulteriori informazioni sulle API del flusso di dati Kinesis, consulta la [Documentazione di riferimento delle API di Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/).

## Scopri i vantaggi dell'utilizzo di KPL
<a name="developing-producers-with-kpl-advantage"></a>

L'elenco seguente rappresenta alcuni dei principali vantaggi dell'utilizzo della KPL per lo sviluppo di producer del flusso di dati Kinesis.

La KPL può essere utilizzata in casi di utilizzo sincroni o asincroni. Suggeriamo di utilizzare le prestazioni più elevate dell'interfaccia asincrona, a meno che non vi sia un motivo specifico per l'utilizzo del comportamento sincrono. Per ulteriori informazioni su questi due casi d'uso e sul codice di esempio, consulta [Scrivi nel tuo flusso di dati Kinesis utilizzando KPL](kinesis-kpl-writing.md).

 **Vantaggi in termini di prestazioni**   
La KPL può aiutare a creare producer ad alte prestazioni. Considera una situazione in cui le istanze Amazon EC2 servono come proxy per la raccolta di eventi a 100 byte provenienti da centinaia o migliaia di dispositivi a bassa potenza e record di scrittura in un flusso di dati Kinesis. Queste istanze EC2 devono scrivere migliaia di eventi al secondo ciascuna per il flusso di dati. Per ottenere il throughput necessario, i produttori devono implementare una logica complicata, ad esempio esecuzione di batch o multithreading, in aggiunta al tentativo di disaggregare la logica e i record da parte del consumer. La KPL esegue tutte queste attività per tuo conto. 

 **Consumer-Side Ease of Use (Facilità utilizzo lato consumer)**   
Per gli sviluppatori lato consumer che utilizzano la KCL in Java, la KPL si integra senza sforzo aggiuntivo. Quando la KCL recupera un record del flusso di dati Kinesis aggregato composto da più record utente della KPL, automaticamente richiama la KPL per estrarre i record utente individuali prima di restituirli all'utente.   
Per gli sviluppatori lato consumer che non utilizzano la KCL ma invece utilizzano l'operazione API `GetRecords` direttamente, una libreria Java della KPL è disponibile per estrarre i record utente individuali prima di restituirli all'utente. 

 **Monitoraggio producer**   
Puoi raccogliere, monitorare e analizzare i tuoi produttori di Kinesis Data Streams utilizzando CloudWatch Amazon e KPL. Il KPL emette velocità effettiva, errori e altre metriche per tuo CloudWatch conto ed è configurabile per il monitoraggio a livello di stream, shard o produttore.

 **Asynchronous Architecture (Architettura asincrona)**   
Poiché il KPL può memorizzare nel buffer i record prima di inviarli a Kinesis Data Streams, non impone all'applicazione chiamante di bloccarsi e attendere la conferma che il record è arrivato al server prima di continuare l'esecuzione. Una chiamata per inserire un record nella KPL restituisce sempre immediatamente e non attende l'invio del record né di ricevere una risposta dal server. Al contrario, viene creato un oggetto `Future` che riceve il risultato di inviare il record al flusso di dati Kinesis in un secondo momento. Questo è lo stesso comportamento dei client asincroni nell'SDK. AWS 

## Comprendi quando non usare KPL
<a name="developing-producers-with-kpl-when"></a>

La KPL può subire un ulteriore ritardo di elaborazione fino a `RecordMaxBufferedTime` all'interno della libreria (configurabile dall'utente). Valori più elevati di `RecordMaxBufferedTime` risultano in creazione di pacchetti più veloce e prestazioni migliori. Le applicazioni che non possono tollerare questo ritardo aggiuntivo potrebbero dover utilizzare direttamente l' AWS SDK. Per ulteriori informazioni sull'utilizzo dell' AWS SDK con Kinesis Data Streams, consulta. [Sviluppa i produttori utilizzando l'API Amazon Kinesis Data Streams con AWS SDK per Java](developing-producers-with-sdk.md) Per ulteriori informazioni su `RecordMaxBufferedTime` e altre proprietà configurabili dall'utente della KPL, consulta [Configurazione della libreria Amazon Kinesis Producer](kinesis-kpl-config.md).

# Installa KPL
<a name="kinesis-kpl-dl-install"></a>

Amazon fornisce file binari predefiniti della Amazon Kinesis Producer Library (KPL) C\$1\$1 per macOS, Windows e distribuzioni Linux recenti (per i dettagli sulla piattaforma supportata, consulta la sezione successiva). Questi file binari vengono confezionati come parte dei file.jar Java e vengono automaticamente richiamati e utilizzati se utilizzi Maven per installare il pacchetto. Per individuare le versioni più recenti della KPL e della KCL, utilizza i seguenti collegamenti di ricerca Maven:
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

I file binari Linux sono stati compilati con GNU Compiler Collection (GCC) e collegati staticamente a libstdc\$1\$1 su Linux. Dovrebbero funzionare su qualsiasi distribuzione Linux a 64 bit, che include una versione glibc 2.5 o successiva.

Gli utenti di distribuzioni Linux precedenti possono creare il KPL utilizzando le istruzioni di compilazione fornite insieme al codice sorgente. GitHub Per scaricare il KPL da GitHub, consulta [Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-producer) Producer Library.

**Importante**  
Amazon Kinesis Producer Library (KPL) 0.x sarà disponibile il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KPL utilizzando la versione 0.x all'ultima versione KPL prima del 30 gennaio 2026. [Per trovare la versione KPL più recente, consulta la pagina KPL su Github.](https://github.com/awslabs/amazon-kinesis-producer) Per informazioni sulla migrazione da KPL 0.x a KPL 1.x, consulta. [Esegui la migrazione da KPL 0.x a KPL 1.x](kpl-migration-1x.md)

# Esegui la migrazione da KPL 0.x a KPL 1.x
<a name="kpl-migration-1x"></a>

Questo argomento fornisce step-by-step istruzioni per migrare il consumatore da KPL 0.x a KPL 1.x. KPL 1.x introduce il supporto per la versione AWS SDK per Java 2.x pur mantenendo la compatibilità dell'interfaccia con le versioni precedenti. Non è necessario aggiornare la logica di elaborazione dei dati di base per migrare a KPL 1.x. 

1. **Assicurati di avere i seguenti prerequisiti:**
   + Java Development Kit (JDK) 8 o versione successiva
   + AWS SDK per Java 2.x
   + Maven o Gradle per la gestione delle dipendenze

1. **Aggiungi dipendenze**

   Se utilizzi Maven, aggiungi la seguente dipendenza al tuo file pom.xml. Assicurati di aver aggiornato GroupId `com.amazonaws` da `software.amazon.kinesis` a e la `1.x.x` versione all'ultima versione KPL. 

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   Se stai usando Gradle, aggiungi quanto segue al tuo file. `build.gradle` Assicurati di sostituirlo `1.x.x` con l'ultima versione di KPL. 

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   Puoi verificare la versione più recente di KPL sul [Maven](https://central.sonatype.com/search?q=amazon-kinesis-producer) Central Repository. 

1. **Aggiorna le dichiarazioni di importazione per KPL**

   KPL 1.x utilizza la versione AWS SDK per Java 2.x e utilizza un nome di pacchetto aggiornato che inizia con`software.amazon.kinesis`, rispetto al nome del pacchetto nel KPL precedente che inizia con. `com.amazonaws.services.kinesis`

   Sostituisci l'importazione per con. `com.amazonaws.services.kinesis` `software.amazon.kinesis` La tabella seguente elenca le importazioni che è necessario sostituire.  
**Importa sostituzioni**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/kpl-migration-1x.html)

1. **Aggiorna le dichiarazioni di importazione per le classi del fornitore di credenziali AWS **

   Durante la migrazione a KPL 1.x, è necessario aggiornare i pacchetti e le classi importati nel codice dell'applicazione KPL basati sulla versione AWS SDK per Java 1.x con quelli corrispondenti basati sulla 2.x. AWS SDK per Java Le importazioni comuni nell'applicazione KPL sono le classi di fornitori di credenziali. Consulta [le modifiche al provider di credenziali](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html) nella documentazione della guida alla migrazione AWS SDK per Java 2.x per l'elenco completo delle modifiche al provider di credenziali. Ecco le comuni modifiche di importazione che potresti dover apportare alle tue applicazioni KPL. 

   **Importa in KPL 0.x**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **Importazione in KPL 1.x**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

   Se importi altri provider di credenziali basati sulla versione AWS SDK per Java 1.x, devi aggiornarli con quelli equivalenti alla AWS SDK per Java versione 2.x. Se non ne hai importato nessuno classes/packages dalla versione AWS SDK per Java 1.x, puoi ignorare questo passaggio.

1. **Aggiorna la configurazione del provider di credenziali nella configurazione KPL**

   La configurazione del provider di credenziali in KPL 1.x richiede i provider di credenziali 2.x. AWS SDK per Java Se stai passando i provider di credenziali per la versione AWS SDK per Java 1.x `KinesisProducerConfiguration` sovrascrivendo il provider di credenziali predefinito, devi aggiornarlo con i provider di credenziali 2.x. AWS SDK per Java Vedi Modifiche al [provider di credenziali nella documentazione della guida alla migrazione AWS SDK per Java 2.x per l'elenco completo delle modifiche](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html) al provider di credenziali. Se non hai sovrascritto il provider di credenziali predefinito nella configurazione KPL, puoi ignorare questo passaggio.

   Ad esempio, se stai sovrascrivendo il provider di credenziali predefinito per KPL con il codice seguente:

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

   È necessario aggiornarli con il codice seguente per utilizzare il provider di credenziali AWS SDK per Java 2.x:

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# Transizione ai certificati Amazon Trust Services (ATS) per la KPL
<a name="kinesis-kpl-upgrades"></a>

Il 9 febbraio 2018, alle 9.00 PST, Flusso di dati Amazon Kinesis ha installato i certificati ATS. [Per continuare a scrivere record su Kinesis Data Streams utilizzando Amazon Kinesis Producer Library (KPL), è necessario aggiornare l'installazione di KPL alla versione 0.12.6 o successiva.](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar) Questa modifica riguarda tutte le regioni. AWS 

Per informazioni sul passaggio ad ATS, consulta [How to Prepare for AWS's Move to Its Own Certificate Authority](https://aws.amazon.com/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/).

In caso di problemi e se hai bisogno di supporto tecnico, [crea un caso](https://console.aws.amazon.com/support/v1#/case/create) utilizzando il Centro di supporto AWS .

# Piattaforme supportate da KPL
<a name="kinesis-kpl-supported-plats"></a>

Amazon Kinesis Producer Library (KPL) è scritta in C\$1\$1 e viene eseguita come processo secondario rispetto al processo utente principale. I file binari nativi precompilati a 64 bit sono nel rilascio di Java e sono gestiti dal wrapper Java.

Il pacchetto Java viene eseguito senza la necessità di installare librerie aggiuntive per i seguenti sistemi operativi:
+ Distribuzioni Linux con kernel 2.6.18 (settembre 2006) e versioni successive
+ Apple iOS X 10.9 e versioni successive
+ Windows Server 2008 e versione successiva
**Importante**  
Windows Server 2008 e versioni successive è supportato per tutte le versioni KPL fino alla versione 0.14.0.   
La piattaforma Windows NON è supportata a partire dalla versione KPL 0.14.0 o successiva.

Tenere presente che la KPL è solo a 64 bit.

## Codice sorgente
<a name="kinesis-kpl-supported-plats-source-code"></a>

Se i file binari forniti nell'installazione della KPL non sono sufficienti per l'ambiente, il core della KPL sarà scritto come modulo C\$1\$1. Il codice sorgente per il modulo C\$1\$1 e l'interfaccia Java sono rilasciati con Amazon Public License e sono disponibili GitHub presso [Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-producer) Producer Library. Sebbene la KPL possa essere utilizzata su qualsiasi piattaforma per la quale sono disponibili un compilatore C\$1\$1 conforme agli standard e JRE, Amazon non supporta ufficialmente alcuna piattaforma che non si trova nell'elenco delle piattaforme supportate.

# Concetti chiave di KPL
<a name="kinesis-kpl-concepts"></a>

Le seguenti sezioni contengono i concetti e la terminologia necessari per comprendere e sfruttare la Amazon Kinesis Producer Library (KPL).

**Topics**
+ [Registri](#kinesis-kpl-concepts-records)
+ [Batching](#kinesis-kpl-concepts-batching)
+ [Aggregazione](#kinesis-kpl-concepts-aggretation)
+ [Raccolta](#kinesis-kpl-concepts-collection)

## Registri
<a name="kinesis-kpl-concepts-records"></a>

In questa guida, si distingue tra *record utente della KPL* e *record del flusso di dati Kinesis*. Quando utilizziamo il termine *record* senza un qualificatore, ci riferiamo a un *record utente dell KPL*. Quando facciamo riferimento a un record del flusso di dati Kinesis, diciamo esplicitamente *record del flusso di dati Kinesis*.

Un record utente della KPL è un blob di dati che ha un significato particolare per l'utente. Alcuni esempi includono un blob JSON che rappresenta un evento di interfaccia utente su un sito Web o una voce di log da un server Web.

Un record del flusso di dati Kinesis è un'istanza della struttura dati `Record` definita dall'API del servizio del flusso di dati Kinesis. Contiene una chiave di partizione, un numero di sequenza e un blob di dati. 

## Batching
<a name="kinesis-kpl-concepts-batching"></a>

*Batching* si riferisce all'esecuzione di una singola operazione su più elementi anziché eseguire ripetutamente l'operazione su ogni singolo elemento. 

In questo contesto, un "elemento" è un record e l'operazione è inviarlo al flusso di dati Kinesis. In una situazione non di batching, è necessario posizionare ogni record in un record del flusso di dati Kinesis separato ed effettuare una richiesta HTTP per inviarlo al flusso di dati Kinesis. Con batching, ogni richiesta HTTP è in grado di eseguire più record invece di uno solo.

La KPL supporta due tipi di batching:
+ *Aggregazione*: l'archiviazione di più record in un singolo record del flusso di dati Kinesis. 
+ *Raccolta*: l'utilizzo dell'operazione API `PutRecords` per inviare più record del flusso di dati Kinesis a una o più partizioni nel flusso di dati Kinesis. 

I due tipi di batching KPL sono stati progettati per coesistere e possono essere attivati o disattivati in modo indipendente. Come impostazione predefinita, entrambi sono abilitati.

## Aggregazione
<a name="kinesis-kpl-concepts-aggretation"></a>

*Aggregazione* si riferisce all'archiviazione di più record in un record del flusso di dati Kinesis. L'aggregazione consente ai clienti di aumentare il numero di record inviati per chiamata API e ciò aumenta anche il throughput producer.

Le partizioni del flusso di dati Kinesis supportano fino a 1.000 record del flusso di dati Kinesis al secondo o 1 MB di velocità di trasmissione effettiva. I record al secondo del flusso di dati Kinesis vincola i clienti a record di dimensioni inferiori a 1 KB. L'aggregazione dei record consente ai clienti di combinare più record in un singolo record del flusso di dati Kinesis. Questo consente ai clienti di migliorare il loro throughput per shard. 

Si consideri il caso di uno shard nella regione us-east-1 attualmente in esecuzione a una velocità costante di 1.000 record al secondo, con record di 512 byte ciascuno. Con l'aggregazione della KPL, è possibile impacchettare 1.000 record in soli 10 record del flusso di dati Kinesis, riducendo le richieste al secondo (RPS) a 10 (a 50 KB ognuna).

## Raccolta
<a name="kinesis-kpl-concepts-collection"></a>

*Raccolta* si riferisce alla creazione di batch di più record del flusso di dati Kinesis e al loro invio in una singola richiesta HTTP con una chiamata all'operazione API `PutRecords`, invece che inviare ogni record del flusso di dati Kinesis nella propria richiesta HTTP.

Questo aumenta il throughput rispetto all'utilizzo di nessuna raccolta perché riduce i costi di molte richieste HTTP separate. Infatti, `PutRecords` è stato specificamente progettato per questo scopo.

La raccolta differisce dall'aggregazione in quanto lavora con gruppi di record del flusso di dati Kinesis. I record del flusso di dati Kinesis raccolti possono comunque contenere più record da parte dell'utente. La relazione può essere visualizzata come segue:

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# Integra KPL con il codice del produttore
<a name="kinesis-kpl-integration"></a>

Amazon Kinesis Producer Library (KPL) viene eseguita in un processo separato e comunica con il processo utente principale tramite IPC. Questa architettura talvolta viene definita un [microservizio](http://en.wikipedia.org/wiki/Microservices) ed è scelta per due motivi:

**1) Il processo utente non si interrompe anche se la KPL riporta un arresto anomalo**  
Il processo può avere attività non correlate al flusso di dati Kinesis e può essere in grado di continuare l'operazione anche se la KPL si arresta in maniera anomala. È anche possibile che il processo dell'utente principale riavvi la KPL e passi a uno stato di funzionamento completo (questa funzionalità è nei wrapper ufficiali).

Un esempio è un server Web che invia i parametri al flusso di dati Kinesis; il server può continuare a servire le pagine anche se la parte del flusso di dati Kinesis ha smesso di funzionare. Interrompere l'intero server a causa di un bug nella KPL provocherebbe interruzioni inutili.

**2) I client arbitrari possono essere supportati**  
Ci sono sempre clienti che utilizzano linguaggi diversi da quelli ufficialmente supportati. Questi clienti dovrebbero essere in grado di utilizzare facilmente la KPL.

## Matrice di utilizzo consigliata
<a name="kinesis-kpl-integration-usage"></a>

La seguente matrice di utilizzo elenca le impostazioni consigliate per i diversi utenti e consiglia se e come utilizzare il KPL. Ricorda che se l'aggregazione è abilitata, la disaggregazione deve essere utilizzata per estrarre i record nel lato consumer. 


| Linguaggio lato producer | Linguaggio lato consumer | Versione della KCL | Logica di checkpoint | Puoi utilizzare la KPL? | Avvertenze | 
| --- | --- | --- | --- | --- | --- | 
| Tutto tranne Java | \$1 | \$1 | \$1 | No | N/D | 
| Java | Java | Utilizza Java SDK direttamente | N/D | Sì | Se l'aggregazione viene utilizzata, è necessario utilizzare la libreria di disaggregazione fornita dopo le chiamate GetRecords. | 
| Java | Tutto tranne Java | Utilizza SDK direttamente | N/D | Sì | È necessario disabilitare l'aggregazione.  | 
| Java | Java | 1.3.x | N/D | Sì | È necessario disabilitare l'aggregazione. | 
| Java | Java  | 1.4.x | Chiama il checkpoint senza alcun argomento | Sì | Nessuno | 
| Java | Java | 1.4.x | Chiama il checkpoint con un numero di sequenza esplicito | Sì | Disabilita l'aggregazione o modifica il codice per utilizzare numeri di sequenza estesi per il checkpoint. | 
| Java | Tutto tranne Java  | 1.3.x \$1 daemon multilingua \$1 wrapper specifici della lingua | N/D | Sì | È necessario disabilitare l'aggregazione.  | 

# Scrivi nel tuo flusso di dati Kinesis utilizzando KPL
<a name="kinesis-kpl-writing"></a>

Le sezioni seguenti mostrano un esempio di codice in una progressione dal produttore più semplice al codice completamente asincrono.

## Codice del produttore Barebones
<a name="kinesis-kpl-writing-code"></a>

Il codice seguente è tutto quello che serve per scrivere un producer che lavora al minimo. I record utente di Amazon Kinesis Producer Library (KPL) vengono elaborati in background.

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## Rispondi ai risultati in modo sincrono
<a name="kinesis-kpl-writing-synchronous"></a>

Nell'esempio precedente, il codice non ha controllato se i record utente della KPL sono riusciti. La KPL esegue qualsiasi tentativo necessario per tenere conto degli errori. Tuttavia, se desideri controllare i risultati, puoi esaminarli utilizzando gli oggetti `Future` restituiti da `addUserRecord`, come nell'esempio seguente (esempio precedente mostrato per contesto):

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## Rispondi ai risultati in modo asincrono
<a name="kinesis-kpl-writing-asynchronous"></a>

L'esempio precedente è la chiamata `get()` a un `Future` oggetto, che blocca il runtime. Se non vuoi bloccare il runtime, puoi usare un callback asincrono, come mostrato nell'esempio seguente:

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# Configurazione della libreria Amazon Kinesis Producer
<a name="kinesis-kpl-config"></a>

Nonostante le impostazioni predefinite dovrebbero funzionare perfettamente per la maggior parte dei casi d'uso, è consigliabile modificare alcune delle impostazioni predefinite per adattare il comportamento di `KinesisProducer` alle proprie esigenze. Un'istanza di classe `KinesisProducerConfiguration` può essere passata al costruttore `KinesisProducer` per fare ciò, per esempio:

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

Puoi anche caricare una configurazione da un file proprietà:

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

Puoi sostituire qualsiasi percorso e nome di file al quale ha accesso il processo utente. Inoltre puoi chiamare metodi impostati sull'istanza `KinesisProducerConfiguration` creata in questo modo per personalizzare la configurazione.

Il file delle proprietà deve specificare i parametri usando i loro nomi in PascalCase. I nomi corrispondono a quelli utilizzati nei metodi impostati nella classe `KinesisProducerConfiguration`. Esempio:

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

Per ulteriori informazioni sulle regole di utilizzo dei parametri di configurazione e sui limiti di valore, vedere il [file di esempio delle proprietà di configurazione su GitHub](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties).

Nota che dopo che `KinesisProducer` è stato inizializzato, modificare l'istanza `KinesisProducerConfiguration` che è stata utilizzata non ha un effetto ulteriore. `KinesisProducer` non supporta al momento la riconfigurazione automatica.

# Implementa la deaggregazione dei consumatori
<a name="kinesis-kpl-consumer-deaggregation"></a>

A partire dalla versione 1.4.0, la KCL supporta la disaggregazione automatica di record utente della KPL. Il codice dell'applicazione consumer scritto con versioni precedenti della KCL verrà compilato senza alcuna modifica dopo l'aggiornamento della KCL. Tuttavia, se l'aggregazione della KPL viene utilizzata nel lato producer, bisogna tenere conto di quanto segue riguardo i checkpoint: tutti i sottorecord in un record aggregato hanno lo stesso numero di sequenza, quindi i dati aggiuntivi devono essere memorizzati nel checkpoint se è necessario distinguere tra sottorecord. Questi dati supplementari vengono definiti come *numero sottosuccessione*.

**Topics**
+ [Esegui la migrazione dalle versioni precedenti di KCL](#kinesis-kpl-consumer-deaggregation-migration)
+ [Usa le estensioni KCL per la disaggregazione KPL](#kinesis-kpl-consumer-deaggregation-extensions)
+ [Usa direttamente GetRecords](#kinesis-kpl-consumer-deaggregation-getrecords)

## Esegui la migrazione dalle versioni precedenti di KCL
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

Non è necessario modificare le chiamate esistenti per eseguire il checkpoint con aggregazione. È comunque garantito che è possibile recuperare tutti i record archiviati nel flusso di dati Kinesis. Il KCL offre ora due nuove operazioni di checkpoint per supportare casi d'uso particolari, descritti di seguito.

Se il codice esistente è stato scritto per KCL prima del supporto KPL e l'operazione di checkpoint viene chiamata senza argomenti, equivale a controllare il numero di sequenza dell'ultimo record utente KPL del batch. Se l'operazione di checkpoint viene chiamata con una stringa di numero di sequenza, è equivalente all'esecuzione del checkpoint nel numero di sequenza del batch fornito assieme al numero di sottosuccessione 0 (zero).

Chiamare la nuova operazione di checkpoint della KCL `checkpoint()` senza alcun argomento è equivalente dal punto di vista semantico all'esecuzione di checkpoint nel numero di sequenza dell'ultima chiamata `Record` nel batch, assieme al numero di sottosuccessione implicito 0 (zero). 

Chiamare la nuova operazione di checkpoint della KCL `checkpoint(Record record)` è equivalente dal punto di vista semantico all'esecuzione di checkpoint nel numero di sequenza del `Record` fornito, assieme al numero di sottosuccessione implicito 0 (zero). Se la chiamata `Record` è effettivamente un `UserRecord`, sul numero di sequenza `UserRecord` e sul numero di sottosuccessione viene eseguito il checkpoint. 

Se si richiama la nuova operazione checkpoint della KCL `checkpoint(String sequenceNumber, long subSequenceNumber)` sul numero di sequenza assieme al numero fornito di sottosuccessione viene eseguito il checkpoint. 

In questi casi, dopo che il checkpoint è stato memorizzato nella tabella di checkpoint di Amazon DynamoDB, la KCL può riprendere correttamente i record di recupero anche quando l'applicazione si interrompe e riavvia. Se più record sono contenuti nella sequenza, il recupero viene eseguito a partire dal prossimo record di numero di sottosuccessione nel record con la sequenza sulla quale è stato eseguito il checkpoint più recentemente. Se il checkpoint più recente includeva l'ultimo record di numero di sottosuccessione del precedente record di numero di sequenza, il recupero parte con il record con il prossimo numero di sequenza. 

La sezione successiva illustra i dettagli del checkpoint di sequenza e sequenza per i consumatori, che devono evitare di saltare e duplicare i record. Se il salto (o duplicazione) di record quando interrompi e riavvii l'elaborazione del record del consumer non è importante, puoi eseguire il codice esistente senza alcuna modifica.

## Usa le estensioni KCL per la disaggregazione KPL
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

La disaggregazione KPL può comportare il checkpoint di sottosequenza. Per facilitare l'utilizzo del checkpoint della sottosuccessione, una classe `UserRecord` è stata aggiunta alla KCL:

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

Questa classe viene ora utilizzata invece di `Record`. Questo non interrompe il codice esistente perché è una sottoclasse di `Record`. La classe `UserRecord` rappresenta entrambi i sottorecord effettivi e i record standard, non aggregati. I record non aggregati non possono essere considerati come record aggregati con un solo sottorecord.

Inoltre, due operazioni nuove vengono aggiunte a `IRecordProcessorCheckpointer`:

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

Per iniziare a utilizzare il checkpoint del numero di sottosuccessione, puoi eseguire la seguente conversione. Modifica il seguente codice di modulo:

```
checkpointer.checkpoint(record.getSequenceNumber());
```

Nuovo codice di modulo:

```
checkpointer.checkpoint(record);
```

È consigliabile utilizzare il modulo `checkpoint(Record record)` per il checkpoint della sottosuccessione. Tuttavia, se stai già memorizzando `sequenceNumbers` in stringhe da utilizzare per il checkpoint, devi anche memorizzare `subSequenceNumber`, come nell'esempio seguente:

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

Il cast from to ha `UserRecord` sempre successo `Record` perché l'implementazione utilizza sempre. `UserRecord` Se non è necessario eseguire calcoli aritmetici sui numeri di sequenza, questo approccio non è consigliato.

Durante l'elaborazione dei record utente della KPL, la KCL scrive il numero di sottosequenza in Amazon DynamoDB come un campo extra per ogni riga. Le versioni precedenti della KCL hanno utilizzato `AFTER_SEQUENCE_NUMBER` per recuperare record durante la ripresa dei checkpoint. L'attuale KCL con il supporto KPL utilizza invece `AT_SEQUENCE_NUMBER`. Quando viene recuperato al numero di sequenza sul quale è stato eseguito il checkpoint, viene controllato il numero di sequenza sul quale è stato eseguito il checkpoint e i sottorecord vengono interrotti, in base alle esigenze (potenzialmente tutti se l'ultimo sottorecord è quello sul quale è stato eseguito il checkpoint). I record non aggregati possono essere considerati come record non aggregati con un sottorecord singolo, perciò lo stesso algoritmo funziona sia per i record aggregati sia non aggregati.

## Usa direttamente GetRecords
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

Puoi anche scegliere di non utilizzare la KCL, ma invece richiamare l'operazione API `GetRecords` direttamente per recuperare i record del flusso di dati Kinesis. Per decomprimere questi record recuperati nei record utente della KPL originali, richiama una delle seguenti operazioni statiche in `UserRecord.java`:

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

La prima operazione utilizza il valore predefinito `0` (zero) per `startingHashKey` e il valore predefinito `2^128 -1` per `endingHashKey`.

Ognuna di queste operazioni disaggrega l'elenco fornito di record del flusso di dati Kinesis in un elenco di record utente della KPL. Qualsiasi record utente KPL la cui chiave hash esplicita o chiave di partizione non rientra nell'intervallo di `startingHashKey` (incluso) e `endingHashKey`(incluso) viene eliminato dalla lista dei record restituiti.

# Usa KPL con Amazon Data Firehose
<a name="kpl-with-firehose"></a>

Se utilizzi la Kinesis Producer Library (KPL) per scrivere i dati su un flusso di dati Kinesis, puoi utilizzare l'aggregazione per abbinare i record che scrivi al flusso di dati Kinesis. Se poi utilizzate quel flusso di dati come fonte per il flusso di distribuzione Firehose, Firehose disaggrega i record prima di consegnarli alla destinazione. Se configuri il flusso di distribuzione per trasformare i dati, Firehose disaggrega i record prima di inviarli a. AWS Lambda Per ulteriori informazioni, consulta [Scrittura su Amazon Firehose tramite il flusso di dati Kinesis](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html).

# Utilizzate il KPL con lo Schema Registry AWS Glue
<a name="kpl-with-schemaregistry"></a>

Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta [Registro degli schemi di AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uno dei modi per configurare questa integrazione è utilizzare le librerie KPL e Kinesis Client Library (KCL) in Java. 

**Importante**  
Attualmente, l'integrazione di Kinesis Data AWS Glue Streams e del registro dello schema è supportata solo per i flussi di dati Kinesis che utilizzano produttori KPL implementati in Java. Il supporto multilingue non viene fornito. 

Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con Schema Registry utilizzando KPL, consulta la [sezione «Interazione con i dati KPL/KCL utilizzando le librerie» in Caso d'uso: integrazione di Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) con il registro dello schema Glue. AWS 

# Configura la configurazione del proxy KPL
<a name="kpl-proxy-configuration"></a>

Per le applicazioni che non possono connettersi direttamente a Internet, tutti i client AWS SDK supportano l'uso di proxy HTTP o HTTPS. In un ambiente aziendale tipico, tutto il traffico di rete in uscita deve passare attraverso server proxy. Se l'applicazione utilizza Kinesis Producer Library (KPL) per raccogliere e inviare dati AWS in un ambiente che utilizza server proxy, l'applicazione richiederà la configurazione del proxy KPL. KPL è una libreria di alto livello basata sull'SDK AWS Kinesis. È suddivisa in un processo nativo e un wrapper. Il processo nativo esegue tutti i processi di elaborazione e invio dei record, mentre il wrapper gestisce il processo nativo e comunica con esso. Per ulteriori informazioni, consulta [Implementazione di producer efficienti e affidabili con la Amazon Kinesis Producer Library](https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/). 

Il wrapper è scritto in Java e il processo nativo è scritto in C\$1\$1 con l'uso dell'SDK Kinesis. La versione 0.14.7 e successive della KPL ora supportano la configurazione proxy nel wrapper Java, che può passare tutte le configurazioni proxy al processo nativo. [Per ulteriori informazioni, consulta https://github.com/awslabs/amazon-kinesis-producer/0.14.7. releases/tag/v](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7)

Puoi utilizzare il codice seguente per aggiungere configurazioni proxy alle applicazioni KPL.

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# Politica del ciclo di vita della versione KPL
<a name="kpl-version-lifecycle-policy"></a>

Questo argomento descrive la politica del ciclo di vita delle versioni per Amazon Kinesis Producer Library (KPL). AWS fornisce regolarmente nuove versioni per le versioni KPL per supportare nuove funzionalità e miglioramenti, correzioni di bug, patch di sicurezza e aggiornamenti delle dipendenze. Ti consigliamo di rimanere up-to-date con le versioni KPL per stare al passo con le funzionalità più recenti, gli aggiornamenti di sicurezza e le dipendenze sottostanti. **Non** consigliamo l'uso continuato di una versione KPL non supportata.

Il ciclo di vita delle principali versioni KPL è costituito dalle tre fasi seguenti:
+ **Disponibilità generale (GA)**: durante questa fase, la versione principale è completamente supportata. AWS fornisce regolarmente versioni secondarie e patch che includono il supporto per nuove funzionalità o aggiornamenti delle API per Kinesis Data Streams, oltre a correzioni di bug e sicurezza.
+ **Modalità di manutenzione**: AWS limita il rilascio delle versioni delle patch per risolvere solo le correzioni di bug critici e i problemi di sicurezza. La versione principale non riceverà aggiornamenti per le nuove funzionalità o APIs per Kinesis Data Streams.
+ **E nd-of-support** — La versione principale non riceverà più aggiornamenti o versioni. Le versioni pubblicate in precedenza continueranno a essere disponibili tramite gestori di pacchetti pubblici e il codice rimarrà attivo GitHub. L'uso di una versione raggiunta end-of-support viene effettuato a discrezione dell'utente. Ti consigliamo di eseguire l'aggiornamento alla versione principale più recente.


| Versione principale | Fase attuale | Data di rilascio | Data della modalità di manutenzione | End-of-support data | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | modalità di manutenzione | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | Disponibilità generale | 2024-12-15 | -- | -- | 

# Sviluppa i produttori utilizzando l'API Amazon Kinesis Data Streams con AWS SDK per Java
<a name="developing-producers-with-sdk"></a>

Puoi sviluppare produttori utilizzando l'API Amazon Kinesis Data Streams AWS con l'SDK for Java. Se sei nuovo per il flusso di dati Kinesis, ti consigliamo di familiarizzare prima con i concetti chiave e la terminologia introdotti in [Cos'è Amazon Kinesis Data Streams?](introduction.md) e [Utilizza il AWS CLI per eseguire operazioni di Amazon Kinesis Data Streams](getting-started.md).

Questi esempi parlano dell'[API del flusso di dati Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/) e utilizzano l'[SDK AWS per Java](https://aws.amazon.com/sdk-for-java/) per aggiungere (inserire) dati a un flusso. Tuttavia, per la maggior parte dei casi d'uso, è preferibile fare riferimento alla libreria KPL del flusso di dati Kinesis. Per ulteriori informazioni, consulta [Sviluppa produttori utilizzando Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

Il codice di esempio Java in questo capitolo illustra come eseguire le operazioni API del flusso di dati Kinesis di base ed è suddiviso logicamente in tipo di operazioni. Questi esempi non rappresentano codici pronti per la produzione, poiché non eseguono un controllo per tutte le possibili eccezioni o spiegano tutte le possibili considerazioni relative alle prestazioni e alla sicurezza. Inoltre, è possibile chiamare l'[API di SDK](https://docs.aws.amazon.com/kinesis/latest/APIReference/) utilizzando altri linguaggi di programmazione. Per ulteriori informazioni su tutto ciò che è disponibile AWS SDKs, consulta [Inizia a sviluppare con Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Ogni attività ha dei requisiti preliminari; ad esempio, non è possibile aggiungere dati a un flusso se non si è creato un flusso, che a sua volta presuppone la creazione di un client. Per ulteriori informazioni, consulta [Crea e gestisci flussi di dati Kinesis](working-with-streams.md).

**Topics**
+ [Aggiungi dati a uno stream](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interagisci con i dati utilizzando lo AWS Glue Schema Registry](kinesis-integration-glue-schema-registry.md)

## Aggiungi dati a uno stream
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Una volta che il flusso è stato creato, è possibile aggiungere dati sotto forma di record. Un record è una struttura di dati che contiene i dati da elaborare sotto forma di un blob di dati. Una volta che i dati sono stati archiviati nel record, il flusso di dati Kinesis non li ispeziona, interpreta o modifica in alcun modo. Ogni record, inoltre, è associato a un numero di sequenza e a una chiave di partizione.

Sono disponibili due diverse operazioni nell'API del flusso di dati Kinesis che consentono di aggiungere dati a un flusso, [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) e [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). L'operazione `PutRecords` invia più record al flusso in ogni richiesta HTTP e l'operazione unica `PutRecord` invia i record al flusso uno alla volta (ogni record necessita di una richiesta HTTP separata). È preferibile utilizzare l'operazione `PutRecords` per la maggior parte delle applicazioni perché questa raggiunge un maggiore throughput per producer dati. Per ulteriori informazioni su ciascuna di queste operazioni, vedere le sottosezioni separate di seguito.

**Topics**
+ [Aggiungi più record con PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Aggiungi un singolo record con PutRecord](#kinesis-using-sdk-java-putrecord)

È necessario tenere sempre a mente che mentre l'applicazione origine aggiunge dati al flusso utilizzando l'API del flusso di dati Kinesis è probabile che ci siano una o più applicazioni consumer che elaborano simultaneamente i dati dal flusso. Per informazioni su come i consumer ottengono i dati utilizzando l'API del flusso di dati Kinesis, consulta [Ottieni dati da un flusso](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**Importante**  
[Modifica il periodo di conservazione dei dati](kinesis-extended-retention.md)

### Aggiungi più record con PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

L'operazione [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) invia più record a SDK in una singola richiesta. Utilizzando `PutRecords`, i producer possono ottenere maggiori livelli di velocità di trasmissione effettiva durante l'invio di dati al loro flusso di dati Kinesis. Ogni richiesta `PutRecords` può supportare fino a 500 record. Ciascun record nella richiesta può avere una dimensione massima pari a 1 MB, con un limite a 5 MB, per l'intera richiesta, incluse le chiavi di partizione. Come con l'operazione singola `PutRecord` descritta di seguito, `PutRecords` utilizza numeri di sequenza e chiavi di partizione. Tuttavia, il parametro `PutRecord` `SequenceNumberForOrdering` non è incluso in una chiamata `PutRecords`. L'operazione `PutRecords` tenta di elaborare tutti i record secondo l'ordine naturale della richiesta. 

Ogni record di dati dispone di un numero di sequenza univoco. Il numero di sequenza viene assegnato dal flusso di dati Kinesis dopo aver chiamato `client.putRecords` per aggiungere i record di dati al flusso. I numeri di sequenza per la stessa chiave di partizione di solito aumentano nel tempo; più è lungo il periodo di tempo tra le richieste `PutRecords`, più aumentano i numeri di sequenza.

**Nota**  
i numeri di sequenza non possono essere utilizzati come indici per set di dati all'interno dello stesso flusso. Per separare i set di dati logicamente, utilizza le chiavi di partizione o crea un flusso separato per ogni set di dati.

Una richiesta `PutRecords` può includere record con diverse chiavi di partizione. L'ambito della richiesta è un flusso; ogni richiesta può includere qualsiasi combinazione di chiavi di partizione e record fino al raggiungimento dei limiti della richiesta. Le richieste effettuate con molte diverse chiavi di partizione a flussi con diversi shard sono in genere più veloci delle richieste con un piccolo numero di chiavi di partizione su un esiguo numero di shard. Il numero di chiavi di partizione deve essere molto più grande del numero di shard per ridurre la latenza e massimizzare il throughput.

#### PutRecords esempio
<a name="kinesis-using-sdk-java-putrecords-example"></a>

Il codice seguente crea 100 record di dati con chiavi di partizione sequenziali e li mette in un flusso chiamato `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

La risposta `PutRecords` include una matrice di risposta `Records`. Ogni record nella matrice di risposta è direttamente correlata a un record nella matrice di richiesta in base all'ordine naturale, dall'alto al basso della richiesta e della risposta. La matrice di risposta `Records` include sempre lo stesso numero di record della matrice di richiesta.

#### Gestisci i guasti durante l'utilizzo PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

Per impostazione predefinita, l'errore nei singoli record all'interno di una richiesta non blocca l'elaborazione dei record successivi in una richiesta `PutRecords`. Ciò significa che una matrice `Records` di risposta include record elaborati e non. È necessario rilevare i record non elaborati correttamente e includerli nella chiamata successiva. 

I record elaborati correttamente includono valori `SequenceNumber` e `ShardID`, mentre quelli non elaborati correttamente includono valori `ErrorCode` e `ErrorMessage`. Il parametro `ErrorCode` riflette il tipo di errore e può coincidere con uno dei seguenti valori: `ProvisionedThroughputExceededException` o `InternalFailure`. `ErrorMessage` offre informazioni più dettagliate sull'eccezione `ProvisionedThroughputExceededException`, incluso l'ID dell'account, il nome del flusso e l'ID dello shard del record oggetto di throttling. L'esempio seguente ha tre record in una richiesta `PutRecords`. Il secondo record ha generato un errore che si riflette nella risposta. 

**Example PutRecords Sintassi della richiesta**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Sintassi di risposta**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

I record non elaborati correttamente possono essere inclusi nelle richieste `PutRecords` successive. In primo luogo, controlla il parametro `FailedRecordCount` in `putRecordsResult` per confermare la presenza di record non elaborati. In questo caso, ogni `putRecordsEntry` che ha un `ErrorCode` che non è `null` deve essere aggiunta a una richiesta successiva. Per un esempio di questo tipo di gestore, fai riferimento al seguente codice.

**Example PutRecords gestore degli errori**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Aggiungi un singolo record con PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Ogni chiamata a [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) opera su un singolo record. È preferibile usare l'operazione `PutRecords` descritta in [Aggiungi più record con PutRecords](#kinesis-using-sdk-java-putrecords) a meno che l'applicazione in uso non abbia specificamente bisogno di inviare sempre record singoli per ogni richiesta, o nel caso in cui per altre ragioni l'operazione `PutRecords` non può essere utilizzata.

Ogni record di dati dispone di un numero di sequenza univoco. Il numero di sequenza viene assegnato dal flusso di dati Kinesis dopo aver chiamato `client.putRecord` per aggiungere i record di dati al flusso. I numeri di sequenza per la stessa chiave di partizione di solito aumentano nel tempo; più è lungo il periodo di tempo tra le richieste `PutRecord`, più aumentano i numeri di sequenza.

 Quando le immissioni si verificano in rapida successione, non è garantito che i numeri di sequenza restituiti aumentino perché le operazioni di introduzione appaiono sostanzialmente come simultanee al flusso di dati Kinesis. Per garantire un aumento rigoroso dei numeri di sequenza per la stessa chiave di partizione, utilizzare il parametro `SequenceNumberForOrdering`, come mostrato nel codice di esempio [PutRecord esempio](#kinesis-using-sdk-java-putrecord-example). 

 Che venga utilizzato o meno il `SequenceNumberForOrdering`, i record che il flusso di dati Kinesis riceve tramite una chiamata `GetRecords` sono rigorosamente ordinati in base al numero di sequenza. 

**Nota**  
i numeri di sequenza non possono essere utilizzati come indici per set di dati all'interno dello stesso flusso. Per separare i set di dati logicamente, utilizza le chiavi di partizione o crea un flusso separato per ogni set di dati.

Per raggruppare i dati nel flusso viene utilizzata una chiave di partizione. Un record di dati viene assegnato a un shard all'interno del flusso in base alla sua chiave di partizione. Nello specifico, il flusso di dati Kinesis utilizza la chiave di partizione come input a una funzione hash che mappi la chiave di partizione (e i dati associati) a una determinata partizione.

 Come conseguenza di questo meccanismo di hashing, tutti i record di dati con la stessa chiave di partizione vengono mappati allo stesso shard all'interno del flusso. Tuttavia, se il numero di chiavi di partizione supera il numero di shard, alcuni shard necessariamente conterranno record con chiavi di partizione diverse. Dal punto di vista di progettazione, per assicurare che tutti gli shard siano ben utilizzati, il numero di shard (specificato dal metodo `setShardCount` di `CreateStreamRequest`) deve essere notevolmente inferiore al numero di chiavi di partizione univoche e la quantità di flusso di dati verso un'unica chiave di partizione deve essere notevolmente inferiore alla capacità dello shard. 

#### PutRecord esempio
<a name="kinesis-using-sdk-java-putrecord-example"></a>

Il codice seguente crea dieci record di dati, distribuiti su due chiavi di partizione, e li mette in un flusso chiamato `myStreamName`.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

Il codice di esempio precedente usa `setSequenceNumberForOrdering` per garantire un ordinamento in aumento rigoroso all'interno di ciascuna chiave di partizione. Per usare questo parametro efficacemente, impostare il `SequenceNumberForOrdering` del record corrente (record *n*) sul numero di sequenza del record precedente (record *n-1*). Per ottenere il numero di sequenza di un record che è stato aggiunto al flusso, chiamare `getSequenceNumber` sul risultato di `putRecord`.

Il parametro `SequenceNumberForOrdering` garantisce garantisce un numero di sequenza strettamente crescente per la stessa chiave di partizione. `SequenceNumberForOrdering` non fornisce ordinazione di record su più chiavi di partizione. 

# Interagisci con i dati utilizzando lo AWS Glue Schema Registry
<a name="kinesis-integration-glue-schema-registry"></a>

Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta [Registro degli schemi di AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uno dei modi per configurare questa integrazione è tramite `PutRecord` Kinesis APIs Data Streams disponibile AWS in Java SDK. `PutRecords` 

Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con il registro dello schema PutRecords utilizzando Kinesis Data Streams, consulta PutRecord la sezione «Interazione con i dati utilizzando APIs Kinesis Data Streams» [in Caso d'uso: integrazione di Amazon](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Kinesis Data APIs Streams con il registro dello schema Glue. AWS 

# Scrivi su Amazon Kinesis Data Streams utilizzando Kinesis Agent
<a name="writing-with-agents"></a>

L'agente Kinesis è un'applicazione software Java autonoma che offre un modo semplice per raccogliere e inviare dati al flusso di dati Kinesis. L'agente controlla costantemente un set di file e invia i nuovi dati al flusso. L'agente gestisce la rotazione dei file, i checkpoint e i tentativi in caso di errori. Fornisce tutti i dati in un modo affidabile, tempestivo e semplice. Inoltre, emette i CloudWatch parametri di Amazon per aiutarti a monitorare e risolvere meglio il processo di streaming.

Come impostazione predefinita, i record vengono analizzati da ciascun file in base alla nuova riga di caratteri (`'\n'`). Tuttavia, l'agente può anche essere configurato per analizzare record a più righe (consulta [Specificare le impostazioni di configurazione dell'agente](#agent-config-settings)). 

Puoi installare l'agente su ambienti server basati su Linux, come server Web, server di log e server di database. Dopo aver installato l'agente, configurarlo specificando i file da monitorare e il flusso per i dati. Dopo che l'agente è configurato, raccoglie i dati dal file in modo durevole e li invia al flusso in modo affidabile.

**Topics**
+ [Completa i prerequisiti per Kinesis Agent](#prereqs)
+ [Download e installazione dell'agente](#download-install)
+ [Configura e avvia l'agente](#config-start)
+ [Specificare le impostazioni di configurazione dell'agente](#agent-config-settings)
+ [Monitora più directory di file e scrivi su più flussi](#sim-writes)
+ [Usa l'agente per pre-elaborare i dati](#pre-processing)
+ [Usa i comandi CLI dell'agente](#cli-commands)
+ [Domande frequenti](#agent-faq)

## Completa i prerequisiti per Kinesis Agent
<a name="prereqs"></a>
+ Il sistema operativo deve essere Amazon Linux AMI con versione 2015.09 o successiva o Red Hat Enterprise Linux versione 7 o successiva.
+ Se utilizzi Amazon EC2 per eseguire l'agente, avvia l'istanza EC2.
+ Gestisci AWS le tue credenziali utilizzando uno dei seguenti metodi:
  + Specifica un ruolo IAM quando avvii l'istanza EC2.
  + Specificate AWS le credenziali quando configurate l'agente (vedi [awsAccessKeyID](#awsAccessKeyId) e [awsSecretAccesschiave](#awsSecretAccessKey)).
  + Modifica `/etc/sysconfig/aws-kinesis-agent` per specificare la regione e le chiavi di AWS accesso.
  + [Se la tua istanza EC2 si trova in un AWS account diverso, crea un ruolo IAM per fornire l'accesso al servizio Kinesis Data Streams e specifica quel ruolo quando configuri l'agente (vedi [AssumeroLearn](#assumeRoleARN) e Id). assumeRoleExternal](#assumeRoleExternalId) Utilizza uno dei metodi precedenti per specificare AWS le credenziali di un utente nell'altro account che ha il permesso di assumere questo ruolo.
+ Il ruolo o AWS le credenziali IAM specificati devono disporre dell'autorizzazione per eseguire l'operazione Kinesis Data [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)Streams affinché l'agente possa inviare dati al tuo stream. Se abiliti il CloudWatch monitoraggio per l'agente, è necessaria anche l'autorizzazione a eseguire l' CloudWatch [PutMetricData](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html)operazione. Per ulteriori informazioni, vedere [Controllo dell'accesso alle risorse Amazon Kinesis Data Streams tramite IAM](controlling-access.md)[Monitora lo stato dell'agente Kinesis Data Streams con Amazon CloudWatch](agent-health.md), e [Controllo degli CloudWatch accessi](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/UsingIAM.html).

## Download e installazione dell'agente
<a name="download-install"></a>

Innanzitutto connettiti all'istanza, Per ulteriori informazioni, consulta [Connect to Your Instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.html) nella *Amazon EC2 User Guide*. In caso di problemi di connessione, consulta [Risoluzione dei problemi di connessione alla tua istanza](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html) nella Guida per l'*utente di Amazon EC2*.

**Per configurare l'agente utilizzando l'AMI Amazon Linux**  
Utilizzare il seguente comando per scaricare e installare l'agente:

```
sudo yum install –y aws-kinesis-agent
```

**Per configurare l'agente utilizzando Red Hat Enterprise Linux**  
Utilizzare il seguente comando per scaricare e installare l'agente:

```
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
```

**Per configurare l'agente utilizzando GitHub**

1. Scarica l'agente da [awlabs/ amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent). 

1. Installare l'agente spostandosi nella directory di download ed eseguendo il comando seguente:

   ```
   sudo ./setup --install
   ```

**Configurazione dell'agente in un container Docker**  
L'agente Kinesis può essere eseguito anche in un container tramite la base container [amazonlinux](https://docs.aws.amazon.com/AmazonECR/latest/userguide/amazon_linux_container_image.html). Utilizza il seguente Dockerfile e poi esegui `docker build`.

```
FROM amazonlinux

RUN yum install -y aws-kinesis-agent which findutils
COPY agent.json /etc/aws-kinesis/agent.json

CMD ["start-aws-kinesis-agent"]
```

## Configura e avvia l'agente
<a name="config-start"></a>

**Configurazione e avvio dell'agente**

1. Aprire e modificare il file di configurazione (come superutente se vengono utilizzate le autorizzazioni predefinite di accesso al file): `/etc/aws-kinesis/agent.json` 

   In questo file di configurazione, specificare i file ( `"filePattern"` ) dai quali l'agente raccoglie i dati e il nome del flusso ( `"kinesisStream"` ) al quale l'agente invia i dati. Si noti che il nome del file è un modello e l'agente riconosce le rotazioni dei file. Puoi ruotare i file o creare nuovi file non più di una volta al secondo. L'agente usa il timestamp di creazione di file per stabilire quale file tracciare e inserire nel flusso; la creazione di nuovi file o la rotazione di file più frequentemente di una volta al secondo non consente all'agente di distinguerli in modo corretto.

   ```
   { 
      "flows": [
           { 
               "filePattern": "/tmp/app.log*", 
               "kinesisStream": "yourkinesisstream"
           } 
      ] 
   }
   ```

1. Avvia l'agente manualmente:

   ```
   sudo service aws-kinesis-agent start
   ```

1. (Facoltativo) Configurare l'agente per iniziare l'avvio del sistema:

   ```
   sudo chkconfig aws-kinesis-agent on
   ```

L'agente è ora in esecuzione come servizio di sistema in background. Controlla costantemente i file specificati e invia i dati al flusso specificato. L'attività dell'agente viene registrata in `/var/log/aws-kinesis-agent/aws-kinesis-agent.log`. 

## Specificare le impostazioni di configurazione dell'agente
<a name="agent-config-settings"></a>

L'agente supporta le due impostazioni di configurazione obbligatorie `filePattern` e `kinesisStream`, più impostazioni di configurazione opzionali per funzionalità aggiuntive. Puoi specificare la configurazione obbligatoria e opzionale in `/etc/aws-kinesis/agent.json`.

Quando modifichi il file di configurazione, devi arrestare e avviare l'agente, utilizzando i comandi seguenti:

```
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
```

In alternativa, potresti utilizzare il comando seguente:

```
sudo service aws-kinesis-agent restart
```

Seguono le impostazioni di configurazione generali.


| Impostazione di configurazione | Description | 
| --- | --- | 
| <a name="assumeRoleARN"></a>assumeRoleARN |  L'ARN del ruolo da assegnare all'utente. Per ulteriori informazioni, consulta [Delegare l'accesso tra AWS account utilizzando i ruoli IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html) nella *Guida per l'utente IAM*.  | 
| <a name="assumeRoleExternalId"></a>assumeRoleExternalId |  Si è verificato un identificatore opzionale che determina chi può assumere il ruolo. Per ulteriori informazioni, consulta [Come utilizzare un ID esterno](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html) nella *Guida per l’utente di IAM*.  | 
| <a name="awsAccessKeyId"></a>awsAccessKeyId |  AWS ID della chiave di accesso che sostituisce le credenziali predefinite. Questa impostazione ha la precedenza su tutti gli altri provider di credenziali.  | 
| <a name="awsSecretAccessKey"></a>awsSecretAccessKey |  AWS chiave segreta che sostituisce le credenziali predefinite. Questa impostazione ha la precedenza su tutti gli altri provider di credenziali.  | 
| cloudwatch.emitMetrics |  Consente all'agente di emettere metriche su CloudWatch if set (true). Impostazione predefinita: true  | 
| cloudwatch.endpoint |  L'endpoint regionale per. CloudWatch Impostazione predefinita: `monitoring.us-east-1.amazonaws.com`  | 
| kinesis.endpoint |  L'endpoint regionale per il flusso di dati Kinesis. Impostazione predefinita: `kinesis.us-east-1.amazonaws.com`  | 

Seguono le impostazioni di configurazione del flusso.


| Impostazione di configurazione | Description | 
| --- | --- | 
| dataProcessingOptions |  L'elenco delle opzioni di elaborazione applicate a ciascun record analizzato prima dell'invio al flusso. Le opzioni di elaborazione vengono eseguite nell'ordine specificato. Per ulteriori informazioni, consulta [Usa l'agente per pre-elaborare i dati](#pre-processing).  | 
| kinesisStream |  [Obbligatorio] Il nome del flusso.  | 
| filePattern |  [Obbligatorio] La directory e il modello di file che devono corrispondere per essere rilevati dall'agente. Per tutti i file che corrispondono a questo modello, le autorizzazioni di lettura devono essere concesse a `aws-kinesis-agent-user`. Per la directory contenente i file, le autorizzazioni di lettura ed esecuzione devono essere concesse `aws-kinesis-agent-user`.  | 
| initialPosition |  La posizione iniziale dalla quale è iniziata l'analisi del file. I valori validi sono `START_OF_FILE` e `END_OF_FILE`. Impostazione predefinita: `END_OF_FILE`  | 
| maxBufferAgeMillis |  Il tempo massimo, in millisecondi, in cui l'agente effettua il buffer dati prima di inviarli al flusso. Intervallo valore: da 1.000 a 900.000 (da 1 secondo a 15 minuti) Impostazione predefinita: 60.000 (1 minuto)  | 
| maxBufferSizeBytes |  Le dimensioni massime, in byte, in cui l'agente effettua il buffer dati prima di inviarli al flusso. Intervallo valore: da 1 a 4.194.304 (4 MB) Impostazione predefinita: 4.194.304 (4 MB)  | 
| maxBufferSizeRecords |  Il numero massimo di record in cui l'agente effettua il buffer dati prima di inviarli al flusso. Intervallo valore: da 1 a 500 Impostazione predefinita: 500  | 
| minTimeBetweenFilePollsMillis |  L'intervallo di tempo, in millisecondi, in cui l'agente esegue il polling e analizza i dati nuovi nei file monitorati. Intervallo valore: 1 o più Impostazione predefinita: 100  | 
| multiLineStartPattern |  Il modello per identificare l'inizio di un record. Un record è composto da una riga corrispondente al modello e da tutte le righe successive non corrispondenti al modello. I valori validi sono espressioni regolari. Come impostazione predefinita, ogni nuova riga nei file di log viene analizzata come un record.  | 
| partitionKeyOption |  Il metodo per generare la chiave di partizione. I valori validi sono `RANDOM` (integer generato casualmente) e `DETERMINISTIC` (un valore hash calcolato dai dati). Impostazione predefinita: `RANDOM`  | 
| skipHeaderLines |  Il numero di righe necessarie perché l'agente salti l'analisi all'inizio dei file monitorati. Intervallo valore: 0 o più Impostazione predefinita: 0 (zero)  | 
| truncatedRecordTerminator |  La stringa che l'agente utilizza per troncare un record analizzato quando le dimensioni del record eccedono il limite di dimensioni del record . (1.000 KB) Impostazione predefinita: `'\n'` (nuova riga)  | 

## Monitora più directory di file e scrivi su più flussi
<a name="sim-writes"></a>

Specificando più impostazioni di configurazione del flusso, puoi configurare l'agente in modo che monitori più directory di file e invii dati a più flussi. Nel seguente esempio di configurazione, l'agente monitora due directory di file e invia i dati rispettivamente a un flusso Kinesis e a un flusso di distribuzione Firehose. Tieni presente che puoi specificare endpoint diversi per Kinesis Data Streams e Firehose in modo che Kinesis stream e Firehose non debbano necessariamente trovarsi nella stessa regione.

```
{
    "cloudwatch.emitMetrics": true,
    "kinesis.endpoint": "https://your/kinesis/endpoint", 
    "firehose.endpoint": "https://your/firehose/endpoint", 
    "flows": [
        {
            "filePattern": "/tmp/app1.log*", 
            "kinesisStream": "yourkinesisstream"
        }, 
        {
            "filePattern": "/tmp/app2.log*",
            "deliveryStream": "yourfirehosedeliverystream" 
        }
    ] 
}
```

Per informazioni più dettagliate sull'utilizzo dell'agente con Firehose, consulta [Writing to Amazon Data Firehose with](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html) Kinesis Agent.

## Usa l'agente per pre-elaborare i dati
<a name="pre-processing"></a>

L'agente può preelaborare i record analizzati dai file monitorati prima di inviarli al flusso. È possibile abilitare questa funzionalità aggiungendo le impostazioni di configurazione `dataProcessingOptions` al flusso di file. Una o più opzioni di elaborazione possono essere aggiunte e saranno eseguite nell'ordine specificato.

L'agente supporta le seguenti opzioni di elaborazione elencate. Poiché l'agente è open source, è possibile sviluppare ulteriormente e ampliare le opzioni di elaborazione. Puoi scaricare l'agente dall’[agente Kinesis](https://github.com/awslabs/amazon-kinesis-agent).Opzioni di elaborazione

`SINGLELINE`  
Converte un record a più righe in un record a riga singola rimuovendo i caratteri di nuova riga, gli spazi iniziali e finali.  

```
{
    "optionName": "SINGLELINE"
}
```

`CSVTOJSON`  
Converte un record da un formato delimitatore separato a un formato JSON.  

```
{
    "optionName": "CSVTOJSON",
    "customFieldNames": [ "field1", "field2", ... ],
    "delimiter": "yourdelimiter"
}
```  
`customFieldNames`  
[Obbligatorio] I nomi di campo utilizzati come chiavi in ciascuna coppia chiave-valore JSON. Ad esempio, se specifichi `["f1", "f2"]`, il record "v1, v2" sarà convertito in `{"f1":"v1","f2":"v2"}`.  
`delimiter`  
La stringa utilizzata come delimitatore nel record. L'impostazione predefinita è una virgola (,).

`LOGTOJSON`  
Converte un record da un formato log a un formato JSON. I formati di log supportati sono **Apache Common Log**, **Apache Combined Log**, **Apache Error Log** e **RFC3164 Syslog**.  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "logformat",
    "matchPattern": "yourregexpattern",
    "customFieldNames": [ "field1", "field2", … ]
}
```  
`logFormat`  
[Obbligatorio] Il formato di inserimento dei log. I seguenti sono i valori possibili:  
+ `COMMONAPACHELOG` - Il formato Apache Common Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}`".
+ `COMBINEDAPACHELOG`: il formato Apache Combined Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}`".
+ `APACHEERRORLOG`: il formato Apache Error Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "`[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}`".
+ `SYSLOG`— Il formato RFC3164 Syslog. Ogni voce di log ha il seguente modello come impostazione predefinita: "`%{timestamp} %{hostname} %{program}[%{processid}]: %{message}`".  
`matchPattern`  
Il modello di espressione regolare utilizzato per estrarre valori dalle voci di log. Questa impostazione viene utilizzata se la tua voce di log non è in uno dei formati di log predefiniti. Se questa impostazione viene utilizzata, devi specificare anche `customFieldNames`.  
`customFieldNames`  
I nomi di campo obbligatori utilizzati come chiavi in ciascuna coppia chiave-valore JSON. Puoi utilizzare questa impostazione per definire i nomi dei campi per i valori estratti da `matchPattern` oppure sovrascrivere i nomi dei campi predefiniti dei formati di log predefiniti.

**Example : configurazione LOGTOJSON**  <a name="example-logtojson"></a>
Questo è un esempio di una configurazione `LOGTOJSON` per una voce Apache Common Log convertita in formato JSON:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG"
}
```
Prima della conversione:  

```
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
```
Dopo la conversione:  

```
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
```

**Example : configurazione LOGTOJSON con campi personalizzati**  <a name="example-logtojson-custom-fields"></a>
Ecco un altro esempio di configurazione `LOGTOJSON`:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
}
```
Con questa impostazione di configurazione, la stessa voce Apache Common Log dall'esempio precedente viene convertita in formato JSON come segue:  

```
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
```

**Example : convertire la voce Apache Common Log**  <a name="example-apache-common-log-entry"></a>
La seguente configurazione di flusso converte una voce Apache Common Log in record a riga singola in formato JSON:  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "dataProcessingOptions": [
                {
                    "optionName": "LOGTOJSON",
                    "logFormat": "COMMONAPACHELOG"
                }
            ]
        }
    ] 
}
```

**Example : convertire record a più righe**  <a name="example-convert-multiline"></a>
La seguente configurazione del flusso analizza i record a più righe la cui prima riga inizia con "`[SEQUENCE=`". Ogni record viene convertito in un record a riga singola. Quindi, i valori vengono estratti dal record in base a un delimitatore di schede. I valori estratti sono mappati in valori `customFieldNames` specificati per formare un record a riga singola in formato JSON.  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "multiLineStartPattern": "\\[SEQUENCE=",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "field1", "field2", "field3" ],
                    "delimiter": "\\t"
                }
            ]
        }
    ] 
}
```

**Example : configurazione LOGTOJSON con modello corrispondente**  <a name="example-logtojson-match-pattern"></a>
Questo è un esempio di una configurazione `LOGTOJSON` per una voce Apache Common Log convertita in formato JSON, con l'ultimo campo (byte) omesso:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})",
    "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"]
}
```
Prima della conversione:  

```
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
```
Dopo la conversione:  

```
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
```

## Usa i comandi CLI dell'agente
<a name="cli-commands"></a>

Avviare automaticamente l'agente all'avvio del sistema: 

```
sudo chkconfig aws-kinesis-agent on
```

Controlla lo stato dell'agente: 

```
sudo service aws-kinesis-agent status
```

Interrompi l'agente: 

```
sudo service aws-kinesis-agent stop
```

Leggi il file di log dell'agente da questa posizione:

```
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
```

Disinstalla l'agente:

```
sudo yum remove aws-kinesis-agent
```

## Domande frequenti
<a name="agent-faq"></a>

### Esiste un agente Kinesis per Windows?
<a name="agent-faq-1"></a>

[L'agente Kinesis per Windows](https://docs.aws.amazon.com/kinesis-agent-windows/latest/userguide/what-is-kinesis-agent-windows.html) è un software diverso dall'agente Kinesis per piattaforme Linux.

### Perché Kinesis Agent sta rallentando e aumentando? and/or `RecordSendErrors`
<a name="agent-faq-2"></a>

Di solito ciò è dovuto alla limitazione di Kinesis. Controlla la `WriteProvisionedThroughputExceeded` metrica per Kinesis Data Streams o la `ThrottledRecords` metrica per Firehose Delivery Streams. Qualsiasi aumento rispetto a 0 di questi parametri indica che è necessario aumentare i limiti dei flussi. Per ulteriori informazioni, consulta [Limiti del flusso di dati Kinesis](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) e [Flussi di consegna di Amazon Firehose](https://docs.aws.amazon.com/firehose/latest/dev/limits.html).

Una volta esclusa la limitazione, verifica se Kinesis Agent è configurato in modo da monitorare grandi quantità di file di piccole dimensioni. Si verifica un ritardo nel momento in cui Kinesis Agent esegue il tail di un nuovo file, quindi Kinesis Agent dovrebbe eseguire la coda su una piccola quantità di file più grandi. Prova a consolidare i tuoi file di log in file più grandi.

### Perché ricevo delle `java.lang.OutOfMemoryError` eccezioni?
<a name="agent-faq-4"></a>

Kinesis Agent non dispone di memoria sufficiente per gestire il carico di lavoro corrente. Prova ad aumentare, `JAVA_START_HEAP` inserire `/usr/bin/start-aws-kinesis-agent` e `JAVA_MAX_HEAP` riavviare l'agente.

### Perché `IllegalStateException : connection pool shut down` ricevo delle eccezioni?
<a name="agent-faq-5"></a>

Kinesis Agent non dispone di connessioni sufficienti per gestire il carico di lavoro corrente. Prova ad aumentare `maxConnections` e `maxSendingThreads` a inserire le impostazioni generali della configurazione dell'agente su. `/etc/aws-kinesis/agent.json` Il valore predefinito per questi campi è 12 volte superiore ai processori di runtime disponibili. Consulta [AgentConfiguration.java](https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/config/AgentConfiguration.java) per ulteriori informazioni sulle impostazioni avanzate delle configurazioni degli agenti. 

### Come posso eseguire il debug di un altro problema con Kinesis Agent?
<a name="agent-faq-6"></a>

`DEBUG`i log di livello possono essere abilitati in. `/etc/aws-kinesis/log4j.xml`

### Come devo configurare Kinesis Agent?
<a name="agent-faq-7"></a>

Più piccolo è`maxBufferSizeBytes`, più frequentemente Kinesis Agent invierà i dati. Ciò può essere utile in quanto riduce i tempi di consegna dei record, ma aumenta anche le richieste al secondo a Kinesis. 

### Perché Kinesis Agent invia record duplicati?
<a name="agent-faq-8"></a>

Ciò si verifica a causa di un'errata configurazione nella coda dei file. Assicurati che ognuno corrisponda `fileFlow’s filePattern` a un solo file. Ciò può verificarsi anche se la `logrotate` modalità utilizzata è `copytruncate` attiva. Prova a passare alla modalità predefinita o crea per evitare duplicazioni. Per ulteriori informazioni sulla gestione dei record duplicati, vedere [Gestione dei record duplicati](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html).

# Scrivi su Kinesis Data AWS Streams utilizzando altri servizi
<a name="using-other-services"></a>

I seguenti AWS servizi possono integrarsi direttamente con Amazon Kinesis Data Streams per scrivere dati su flussi di dati Kinesis. Consulta le informazioni relative a ciascun servizio che ti interessa e consulta i riferimenti forniti.

**Topics**
+ [Scrivi su Kinesis Data Streams utilizzando AWS Amplify](using-other-services-amplify.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon Aurora](using-other-services-aurora.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon CloudFront](using-other-services-CloudFront.md)
+ [Scrivi su Kinesis Data Streams CloudWatch utilizzando Amazon Logs](using-other-services-cw-logs.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon Connect](using-other-services-connect.md)
+ [Scrivi su Kinesis Data Streams utilizzando AWS Database Migration Service](using-other-services-migration.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon DynamoDB](using-other-services-ddb.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon EventBridge](using-other-services-eventbridges.md)
+ [Scrivi su Kinesis Data Streams utilizzando AWS IoT Core](using-other-services-iot-core.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon Relational Database Service](using-other-services-rds.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon Pinpoint](using-other-services-pinpoint.md)
+ [Scrivi su Kinesis Data Streams utilizzando Amazon Quantum Ledger Database (Amazon QLDB)](using-other-services-quantum-ledger.md)

# Scrivi su Kinesis Data Streams utilizzando AWS Amplify
<a name="using-other-services-amplify"></a>

Puoi utilizzare Amazon Kinesis Data Streams per lo streaming di dati dalle tue applicazioni mobili create AWS con Amplify per l'elaborazione in tempo reale. Puoi quindi creare pannelli di controllo in tempo reale, acquisire eccezioni e generare avvisi, generare consigli e prendere altre decisioni aziendali o operative in tempo reale. Puoi anche inviare dati ad altri servizi come Amazon Simple Storage Service, Amazon DynamoDB e Amazon Redshift.

Per ulteriori informazioni, consulta [Utilizzo di Amazon Kinesis](https://docs.amplify.aws/react/build-a-backend/more-features/analytics/streaming-data/) nel *Centro per sviluppatori di AWS Amplify*. 

# Scrivi su Kinesis Data Streams utilizzando Amazon Aurora
<a name="using-other-services-aurora"></a>

Puoi utilizzare Flusso di dati Amazon Kinesis per monitorare le attività sui cluster di database Amazon Aurora. Con i flussi di attività di database, il cluster di database Aurora invia le attività a un Flusso di dati Amazon Kinesis in tempo reale. È quindi possibile creare applicazioni per la gestione della conformità che utilizzano queste attività, le controllano e generano avvisi. Puoi utilizzare Amazon Firehose anche per archiviare i dati.

Per ulteriori informazioni, consulta [Flussi di attività del database](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html) nella *Guida per gli sviluppatori di Amazon Aurora*. 

# Scrivi su Kinesis Data Streams utilizzando Amazon CloudFront
<a name="using-other-services-CloudFront"></a>

Puoi usare Amazon Kinesis Data CloudFront Streams con log in tempo reale e ottenere informazioni sulle richieste fatte a una distribuzione in tempo reale. Puoi quindi creare il tuo [consumer Kinesis Data Stream o utilizzare Amazon Data](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html) Firehose per inviare i dati di log ad Amazon S3, Amazon Redshift, Amazon Service o a un servizio di elaborazione dei log di terze OpenSearch parti.

Per ulteriori informazioni, consulta la sezione [Log in tempo reale](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) nella *Amazon CloudFront Developer Guide*. 

# Scrivi su Kinesis Data Streams CloudWatch utilizzando Amazon Logs
<a name="using-other-services-cw-logs"></a>

Puoi utilizzare CloudWatch gli abbonamenti per accedere a un feed di eventi di registro in tempo reale da Amazon CloudWatch Logs e inviarlo a un flusso di dati Kinesis per l'elaborazione, l'analisi e il caricamento su altri sistemi. 

Per ulteriori informazioni, consulta [Elaborazione in tempo reale dei dati di log con abbonamenti](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Subscriptions.html) nella *Amazon CloudWatch Logs User Guide.* 

# Scrivi su Kinesis Data Streams utilizzando Amazon Connect
<a name="using-other-services-connect"></a>

Puoi utilizzare il flusso di dati Kinesis per esportare i record dei contatti e gli eventi degli agenti in tempo reale dalla tua istanza Amazon Connect. Puoi anche abilitare lo streaming di dati dai profili dei clienti di Amazon Connect per ricevere automaticamente aggiornamenti a un flusso di dati Kinesis sulla creazione di nuovi profili o modifiche a quelli esistenti.

Puoi quindi creare applicazioni consumer per elaborare e analizzare i dati in tempo reale. Ad esempio, utilizzando i record di contatto e i dati del profilo cliente, è possibile conservare i dati dei sistemi di origine, come CRMs gli strumenti di automazione del marketing, up-to-date con le informazioni più recenti. Utilizzando i dati sugli eventi degli agenti, puoi creare pannelli di controllo che visualizzano informazioni ed eventi sugli agenti e attivare notifiche personalizzate di attività specifiche degli agenti.

Per ulteriori informazioni, consulta [Streaming di dati per la tua istanza](https://docs.aws.amazon.com/connect/latest/adminguide/data-streaming.html), [Configurazione dell'esportazione in tempo reale](https://docs.aws.amazon.com/connect/latest/adminguide/set-up-real-time-export.html) e [Flussi di eventi degli agenti](https://docs.aws.amazon.com/connect/latest/adminguide/agent-event-streams.html) nella *Guida per l'amministratore di Amazon Connect*. 

# Scrivi su Kinesis Data Streams utilizzando AWS Database Migration Service
<a name="using-other-services-migration"></a>

Puoi utilizzarlo AWS Database Migration Service per migrare i dati verso un flusso di dati Kinesis. Puoi quindi creare applicazioni consumer per elaborare e analizzare i record di dati in tempo reale. Puoi anche inviare facilmente dati downstream ad altri servizi come Amazon Simple Storage Service, Amazon DynamoDB e Amazon Redshift

Per ulteriori informazioni, consulta [Utilizzo del flusso di dati Kinesis](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html) nella *Guida per l'utente di AWS Database Migration Service *. 

# Scrivi su Kinesis Data Streams utilizzando Amazon DynamoDB
<a name="using-other-services-ddb"></a>

Utilizza Flusso di dati Amazon Kinesis per acquisire le modifiche apportate a Amazon DynamoDB. Kinesis Data Streams acquisisce le modifiche a livello di elemento in qualsiasi tabella DynamoDB e le replica in un flusso dei dati Kinesis. Le tue applicazioni consumer possono accedere a questo flusso per visualizzare le modifiche a livello di elemento in tempo reale e fornire tali modifiche a valle o agire in base al contenuto.

Per ulteriori informazioni, consulta [Funzionamento del flusso di dati Kinesis con DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html) nella *Guida per gli sviluppatori di Amazon DynamoDB*. 

# Scrivi su Kinesis Data Streams utilizzando Amazon EventBridge
<a name="using-other-services-eventbridges"></a>

Utilizzando Kinesis Data Streams, AWS puoi inviare EventBridge eventi di chiamata [API](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html) in un flusso, creare applicazioni consumer ed elaborare grandi quantità di dati. Puoi anche utilizzare Kinesis Data Streams come EventBridge destinazione in Pipes e fornire record in un flusso da una delle fonti disponibili dopo il filtraggio e l'arricchimento opzionali.

Per ulteriori informazioni, consulta [Send events to an Amazon Kinesis stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-relay-events-kinesis-stream.html) and [EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html) nella *Amazon EventBridge User* Guide. 

# Scrivi su Kinesis Data Streams utilizzando AWS IoT Core
<a name="using-other-services-iot-core"></a>

È possibile scrivere dati in tempo reale dai messaggi MQTT in AWS IoT Core utilizzando le azioni AWS IoT Rule. È quindi possibile creare applicazioni che elaborano i dati, ne analizzano il contenuto e generano avvisi e li distribuiscono in applicazioni di analisi o altri servizi AWS , 

Per ulteriori informazioni, consultal flusso di dati Kinesis[https://docs.aws.amazon.com/iot/latest/developerguide/kinesis-rule-action.html](https://docs.aws.amazon.com/iot/latest/developerguide/kinesis-rule-action.html) nella *Guida per gli sviluppatori di AWS IoT Core*. 

# Scrivi su Kinesis Data Streams utilizzando Amazon Relational Database Service
<a name="using-other-services-rds"></a>

Puoi utilizzare Flusso di dati Amazon Kinesis per monitorare le attività sulle istanze Amazon RDS. Utilizzando Database Activity Streams, Amazon RDS invia le attività a un flusso di dati Kinesis in tempo reale. È quindi possibile creare applicazioni per la gestione della conformità che utilizzano queste attività, le controllano e generano avvisi. Puoi anche utilizzare Amazon Data Firehose per archiviare i dati.

Per ulteriori informazioni, consulta [Flussi di attività del database](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/DBActivityStreams.html) nella *Guida per gli sviluppatori di Amazon RDS*. 

# Scrivi su Kinesis Data Streams utilizzando Amazon Pinpoint
<a name="using-other-services-pinpoint"></a>

È possibile configurare Amazon Pinpoint per l'invio dei dati degli eventi a Flusso di dati Amazon Kinesis. Amazon Pinpoint può inviare dati sugli eventi per campagne, percorsi e messaggi e-mail e SMS transazionali. Puoi quindi importare i dati in applicazioni di analisi o creare applicazioni personalizzate per i consumer che intraprendono azioni in base al contenuto degli eventi.

Per ulteriori informazioni, consulta [Eventi di streaming](https://docs.aws.amazon.com/pinpoint/latest/developerguide/event-streams.html) nella *Guida per gli sviluppatori di Amazon Pinpoint*. 

# Scrivi su Kinesis Data Streams utilizzando Amazon Quantum Ledger Database (Amazon QLDB)
<a name="using-other-services-quantum-ledger"></a>

Puoi creare uno stream in Amazon QLDB che acquisisce ogni revisione del documento inserita nel tuo journal e fornisce questi dati ad Amazon Kinesis Data Streams in tempo reale. Un flusso QLDB è un flusso continuo di dati dal diario del registro a una risorsa del flusso di dati Kinesis. Quindi, puoi utilizzare la piattaforma di streaming Kinesis o la Kinesis Client Library per consumare lo streaming, elaborare i record di dati e analizzare il contenuto dei dati. Un flusso QLDB scrive i dati sul flusso di dati Kinesis in tre tipi di record: `control`, `block summary` e `revision details`. 

Per ulteriori informazioni, consulta [Flussi](https://docs.aws.amazon.com/qldb/latest/developerguide/streams.html) nella *Guida per gli sviluppatori di Amazon QLDB*. 

# Scrivi su Kinesis Data Streams utilizzando integrazioni di terze parti
<a name="using-other-services-third-party"></a>

È possibile scrivere dati su Kinesis Data Streams utilizzando una delle seguenti opzioni di terze parti che si integrano con Kinesis Data Streams. Seleziona l'opzione su cui desideri saperne di più e trova risorse e link alla documentazione pertinente.

**Topics**
+ [Apache Flink](using-other-services-flink.md)
+ [Fluentd](using-other-services-Fluentd.md)
+ [Debezium](using-other-services-Debezium.md)
+ [Oracle GoldenGate](using-other-services-Oracle-GoldenGate.md)
+ [Connessione Kafka](using-other-services-kafka-connect.md)
+ [Adobe Experience](using-other-services-adobe.md)
+ [Striim](using-other-services-Striim.md)

# Apache Flink
<a name="using-other-services-flink"></a>

Apache Flink è un diffuso framework open source e motore di elaborazione distribuito per calcoli con stato su flussi di dati illimitati e limitati. Per ulteriori informazioni sulla scrittura sul flusso di dati Kinesis da Apache Flink, consulta [Connettore del flusso di dati Amazon Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/). 

# Fluentd
<a name="using-other-services-Fluentd"></a>

Fluentd è un raccoglitore di dati open source per un livello di registrazione unificato. Per ulteriori informazioni sulla scrittura sul flusso di dati Kinesis da Fluentd. Per ulteriori informazioni, consulta [Elaborazione dei flussi con Kinesis](https://docs.fluentd.org/how-to-guides/kinesis-stream). 

# Debezium
<a name="using-other-services-Debezium"></a>

Debezium è una piattaforma open source distribuita per l'acquisizione dei dati relativi alle modifiche. Per ulteriori informazioni sulla scrittura sul flusso di dati Kinesis da Debezium, consulta [Streaming delle modifiche ai dati MySQL su Amazon Kinesis](https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/). 

# Oracle GoldenGate
<a name="using-other-services-Oracle-GoldenGate"></a>

Oracle GoldenGate è un prodotto software che consente di replicare, filtrare e trasformare i dati da un database a un altro database. Per ulteriori informazioni sulla scrittura su Kinesis Data Streams GoldenGate da Oracle[, consulta Replica dei dati su Kinesis Data Stream utilizzando](https://blogs.oracle.com/dataintegration/post/data-replication-to-aws-kinesis-data-stream-using-oracle-goldengate) Oracle. GoldenGate 

# Connessione Kafka
<a name="using-other-services-kafka-connect"></a>

Kafka Connect è uno strumento per lo streaming scalabile e affidabile di dati tra Apache Kafka e altri sistemi. Per ulteriori informazioni sulla scrittura di dati da Apache Kafka al flusso di dati Kinesis, consulta [Connettore Kafka per Kinesis](https://github.com/awslabs/kinesis-kafka-connector). 

# Adobe Experience
<a name="using-other-services-adobe"></a>

La piattaforma Adobe Experience consente alle organizzazioni di centralizzare e standardizzare i dati dei clienti da qualsiasi sistema. Quindi applica data science e machine learning per migliorare notevolmente la progettazione e la distribuzione di esperienze ricche e personalizzate. Per ulteriori informazioni sulla scrittura di dati dalla piattaforma Adobe Experience al flusso di dati Kinesis, consulta [Come creare una connessione ad Amazon Kinesis](https://experienceleague.adobe.com/docs/experience-platform/destinations/catalog/cloud-storage/amazon-kinesis.html?lang=en). 

# Striim
<a name="using-other-services-Striim"></a>

Striim è una piattaforma completa in memoria per la raccolta end-to-end, il filtraggio, la trasformazione, l'arricchimento, l'aggregazione, l'analisi e la fornitura di dati in tempo reale. Per ulteriori informazioni su come scrivere dati sul flusso di dati Kinesis da Striim, consulta [Writer Kinesis](https://www.striim.com/docs/en/kinesis-writer.html). 

# Risolvi i problemi dei produttori di Amazon Kinesis Data Streams
<a name="troubleshooting-producers"></a>

**Topics**
+ [La mia applicazione di produzione sta scrivendo a una velocità inferiore al previsto](#producer-writing-at-slower-rate)
+ [Ricevo un errore di autorizzazione della chiave master KMS non autorizzata](#unauthorized-kms-producer)
+ [Risolvi altri problemi comuni per i produttori](#misc-troubleshooting-producer)

## La mia applicazione di produzione sta scrivendo a una velocità inferiore al previsto
<a name="producer-writing-at-slower-rate"></a>

**Topics**
+ [Limiti di servizio superati](#service-limits-exceeded)
+ [Voglio ottimizzare il mio produttore](#producer-optimization)
+ [Uso improprio delle operazioni `flushSync()`](#misuse-tag)

### Limiti di servizio superati
<a name="service-limits-exceeded"></a>

Per scoprire se vengono superate le restrizioni dei servizi, controlla se il producer sta generando eccezioni di throughput dal servizio e convalida le operazioni API sulle quali viene effettuato il throttling. Ricorda che ci sono limiti diversi in base alla chiamata, consulta [Quote e limiti](service-sizes-and-limits.md). Ad esempio, in aggiunta ai limiti a livello di shard per le operazioni di lettura e scrittura più comunemente noti, ci sono i seguenti limiti a livello di flusso:
+ [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)
+ [DeleteStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html)
+ [ListStreams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html)
+ [GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)
+ [MergeShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html)
+ [DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)
+ [DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)

Le operazioni `CreateStream`, `DeleteStream`, `ListStreams`, `GetShardIterator`, e `MergeShards` sono limitate a 5 chiamate al secondo. L'operazione `DescribeStream` è limitata a 10 chiamate al secondo. L'operazione `DescribeStreamSummary` è limitata a 20 chiamate al secondo.

Se queste chiamate non sono il problema, assicurati di aver selezionato una chiave di partizione che consente di distribuire in modo uniforme le operazioni *put* e che non disponi di una determinata chiave di partizione che va contro le restrizioni dei servizi quando il resto non lo fa. Ciò richiede di misurare i picchi di throughput e considerare il numero di shard nel flusso. Per ulteriori informazioni sulla gestione dei flussi, consulta [Crea e gestisci flussi di dati Kinesis](working-with-streams.md).

**Suggerimento**  
Ricordati di arrotondare al kilobyte più vicino nei calcoli di throttling del throughput quando utilizzi l'operazione di record singolo [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html), mentre l'operazione multi-record [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) arrotonda la somma cumulativa dei record in ciascuna chiamata. Ad esempio, su una richiesta `PutRecords` di 600 record di 1,1 KB non verrà effettuato il throttling. 

### Voglio ottimizzare il mio produttore
<a name="producer-optimization"></a>

Prima di iniziare a ottimizzare il tuo produttore, completa le seguenti attività chiave. In primo luogo, identifica il throughput maggiore desiderato in termini di dimensioni di record e record al secondo. Quindi, scarta la capacità di streaming come fattore di limitazione ([Limiti di servizio superati](#service-limits-exceeded)). Se hai escluso la capacità di streaming, utilizza i seguenti suggerimenti per la risoluzione dei problemi e linee guida sull'ottimizzazione per due tipi di produttori comuni.

**Produttore di grandi dimensioni**

Un producer di grandi dimensioni in genere è in esecuzione da un server on-premise o un'istanza Amazon EC2. Ai clienti che necessitano di un throughput maggiore da un producer di grandi dimensioni in genere interessa la latenza per record. Le strategie per gestire la latenza includono: se il cliente è in grado di creare record in microbatch/buffer, utilizza la [Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) (che dispone di una logica di aggregazione avanzata), l'operazione multi-record o aggrega i record in un file più grande prima di utilizzare l'operazione [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)a record singolo. [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) Se non sei in grado di eseguire un batch/buffer, utilizza più thread per scrivere nel servizio di Flussi di dati Kinesis nello stesso momento. Questi AWS SDK per Java e altri SDKs includono client asincroni che possono eseguire questa operazione con pochissimo codice.

**Produttore di piccole dimensioni**

Un producer di piccole dimensioni solitamente è un'applicazione mobile, un dispositivo IoT o un client Web. Se si tratta di un'app per dispositivi mobili, ti consigliamo di utilizzare l'`PutRecords`operazione o il Kinesis Recorder nel dispositivo mobile. AWS SDKs Per ulteriori informazioni, consulta la Guida AWS Mobile SDK per Android introduttiva e la Guida AWS Mobile SDK for iOS introduttiva. Le applicazioni mobili devono gestire le connessioni intermittenti intrinsecamente e hanno bisogno di un batch di put, ad esempio `PutRecords`. Se non sei in grado di eseguire un batch per qualsiasi motivo, consulta le informazioni sul Produttore di grandi dimensioni qui sopra. Se il producer è un browser, la quantità di dati generati è generalmente inferiore. Tuttavia, stai avviando le operazioni *put* nel percorso critico dell'applicazione, che non è consigliabile.

### Uso improprio delle operazioni `flushSync()`
<a name="misuse-tag"></a>

Un utilizzo `flushSync()` errato può influire in modo significativo sulle prestazioni di scrittura. L'`flushSync()`operazione è progettata per scenari di arresto per garantire che tutti i record memorizzati nel buffer vengano inviati prima del termine dell'applicazione KPL. Se hai implementato questa operazione dopo ogni operazione di scrittura, può aggiungere una notevole latenza aggiuntiva, circa 500 ms per scrittura. Assicurati di aver implementato `flushSync()` solo l'arresto dell'applicazione per evitare inutili ritardi aggiuntivi nelle prestazioni di scrittura. 

## Ricevo un errore di autorizzazione della chiave master KMS non autorizzata
<a name="unauthorized-kms-producer"></a>

Questo errore si verifica quando un'applicazione producer scrive in un flusso crittografato senza autorizzazioni sulla chiave master KMS. Per assegnare le autorizzazioni a un'applicazione per accedere a una chiave KMS, consulta [Utilizzo delle policy della chiave in AWS KMS](https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html) e [Utilizzo delle policy IAM con AWS KMS](https://docs.aws.amazon.com/kms/latest/developerguide/iam-policies.html).

## Risolvi altri problemi comuni per i produttori
<a name="misc-troubleshooting-producer"></a>
+ [Perché il mio flusso di dati Kinesis restituisce un errore interno del server 500?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-500-error/)
+ [Come posso risolvere gli errori di timeout durante la scrittura da Flink al flusso di dati Kinesis?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/)
+ [Come posso risolvere gli errori di limitazione nel flusso di dati Kinesis?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling-errors/)
+ [Perché il mio flusso di dati Kinesis è limitato?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling/)
+ [Come posso inserire i record di dati in un flusso di dati Kinesis utilizzando la KPL?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-kpl/)

# Ottimizza i produttori di Kinesis Data Streams
<a name="advanced-producers"></a>

Puoi ottimizzare ulteriormente i tuoi produttori di Amazon Kinesis Data Streams in base al comportamento specifico che vedi. Consulta i seguenti argomenti per identificare le soluzioni.

**Topics**
+ [Personalizza i tentativi KPL e il comportamento dei limiti di velocità](kinesis-producer-adv-retries-rate-limiting.md)
+ [Applica le migliori pratiche all'aggregazione KPL](kinesis-producer-adv-aggregation.md)

# Personalizza i tentativi KPL e il comportamento dei limiti di velocità
<a name="kinesis-producer-adv-retries-rate-limiting"></a>

Quando aggiungi record utente di Amazon Kinesis Producer Library (KPL) utilizzando l'`addUserRecord()`operazione KPL, a un record viene assegnato un timestamp e aggiunto a un buffer con una scadenza impostata dal parametro di configurazione. `RecordMaxBufferedTime` Questa combinazione stamp/deadline temporale imposta la priorità del buffer. I record vengono svuotati dal buffer in base ai seguenti criteri:
+ Priorità buffer
+ Configurazione aggregazione
+ Configurazione della raccolta

I parametri di configurazione di aggregazione e raccolta che influiscono sul comportamento del buffer sono i seguenti:
+ `AggregationMaxCount`
+ `AggregationMaxSize`
+ `CollectionMaxCount`
+ `CollectionMaxSize`

I record scaricati vengono quindi inviati al flusso di dati Kinesis come record del flusso di dati Amazon Kinesis utilizzando una chiamata all'operazione dell'API del flusso di dati Kinesis `PutRecords`. L'operazione `PutRecords` invia le richieste al tuo flusso che occasionalmente restituiscono errori totali o parziali. I record con esito negativo vengono automaticamente aggiunti al buffer della KPL. La nuova scadenza è impostata sul minimo di questi due valori: 
+ Metà della configurazione `RecordMaxBufferedTime` corrente
+ Il valore del time-to-live record

Questa strategia consente di includere i record utente KPL riprovati nelle successive chiamate all'API Kinesis Data Streams, per migliorare il throughput e ridurre la complessità, rafforzando al contempo il valore del record Kinesis Data Streams. time-to-live Non esiste un algoritmo di backoff, pertanto questa strategia di nuovi tentativi è relativamente aggressiva. Lo spamming causato da un numero di nuovi tentativi eccessivo viene impedito dalla limitazione della frequenza, discussa nella sezione successiva.

## Limitazione della velocità
<a name="kinesis-producer-adv-retries-rate-limiting-rate-limit"></a>

La KPL include una funzione di limitazione della frequenza, che limita la velocità di trasmissione effettiva della partizione inviata da un singolo producer. La limitazione della frequenza viene implementata utilizzando un algoritmo bucket di token con bucket separati sia per i byte sia per i record . Ogni scrittura riuscita in un flusso di dati Kinesis aggiunge uno o più token a ciascun bucket, fino a una determinata soglia. Questa soglia è configurabile ma, per impostazione predefinita, è impostata su un valore del 50% superiore rispetto al limite di shard effettivo, per permettere la saturazione dello shard da un singolo producer. 

È possibile ridurre questo limite per ridurre lo spamming dovuto a un numero eccessivo di nuovi tentativi. Tuttavia, la best practice per ogni producer è riprovare per ottenere il massimo rendimento in modo aggressivo e gestire qualsiasi throttling risultante considerato eccessivo espandendo la capacità del flusso e implementando una strategia di chiave di partizione appropriata.

# Applica le migliori pratiche all'aggregazione KPL
<a name="kinesis-producer-adv-aggregation"></a>

Sebbene lo schema dei numeri di sequenza dei record Amazon Kinesis Data Streams risultanti rimanga lo stesso, l'aggregazione fa sì che l'indicizzazione dei record utente di Amazon Kinesis Producer Library (KPL) contenuti in un record Kinesis Data Streams aggregato inizi da 0 (zero); tuttavia, purché non si faccia affidamento sui numeri di sequenza per identificare in modo univoco i record utente KPL, il codice puoi ignorarlo, come l'aggregazione (dei tuoi record utente KPL in un record Kinesis Data Streams) e la successiva disaggregazione (di un record Kinesis Data Streams nei tuoi record utente KPL) se ne occupa automaticamente per te. Questo vale sia che il consumatore utilizzi KCL o l' AWS SDK. Per utilizzare questa funzionalità di aggregazione, dovrai inserire la parte Java di KPL nella tua build se il tuo consumatore è stato scritto utilizzando l'API fornita nell'SDK. AWS 

Per utilizzare i numeri di sequenza come identificatori univoci dei tuoi record utente della KPL, ti consigliamo di usare le operazioni `public int hashCode()` e `public boolean equals(Object obj)` fornite in `Record` e `UserRecord` per abilitare il confronto dei record utente della KPL. Inoltre, per esaminare il numero di sequenza secondaria del tuo record utente &KPL;, è possibile trasmetterlo a un'istanza `UserRecord` e recuperare il suo numero di sequenza secondaria.

Per ulteriori informazioni, consulta [Implementa la deaggregazione dei consumatori](kinesis-kpl-consumer-deaggregation.md).