

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Inserimento conveniente di dati IoT direttamente in Amazon S3 con AWS IoT Greengrass
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass"></a>

*Sebastian Viviani e Rizwan Syed, Amazon Web Services*

## Riepilogo
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-summary"></a>

Questo modello mostra come importare in modo conveniente dati Internet of Things (IoT) direttamente in un bucket Amazon Simple Storage Service (Amazon S3) utilizzando un dispositivo AWS IoT Greengrass versione 2. Il dispositivo esegue un componente personalizzato che legge i dati IoT e li salva in una memoria persistente (ovvero un disco o un volume locale). Quindi, il dispositivo comprime i dati IoT in un file Apache Parquet e carica periodicamente i dati su un bucket S3.

La quantità e la velocità dei dati IoT che acquisisci sono limitate solo dalle funzionalità hardware perimetrali e dalla larghezza di banda della rete. Puoi usare Amazon Athena per analizzare in modo conveniente i dati acquisiti. [Athena supporta i file compressi di Apache Parquet e la visualizzazione dei dati utilizzando Amazon Managed Grafana.](https://docs.aws.amazon.com/grafana/latest/userguide/what-is-Amazon-Managed-Service-Grafana.html)

## Prerequisiti e limitazioni
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-prereqs"></a>

**Prerequisiti**
+ Un account AWS attivo
+ Un [edge gateway](https://docs.aws.amazon.com/greengrass/v1/developerguide/quick-start.html) che funziona su [AWS IoT Greengrass versione 2](https://docs.aws.amazon.com/greengrass/v2/developerguide/greengrass-v2-whats-new.html) e raccoglie dati dai sensori (le fonti di dati e il processo di raccolta dei dati esulano dall'ambito di questo modello, ma è possibile utilizzare quasi tutti i tipi di dati dei sensori. Questo modello utilizza un broker [MQTT](https://mqtt.org/) locale con sensori o gateway che pubblicano dati localmente.)
+ [Componenti](https://docs.aws.amazon.com/greengrass/v2/developerguide/develop-greengrass-components.html)[, [ruoli](https://docs.aws.amazon.com/greengrass/v1/developerguide/service-role.html) e dipendenze SDK di AWS IoT Greengrass](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#installation)
+ Un [componente di gestione dello stream](https://docs.aws.amazon.com/greengrass/v2/developerguide/stream-manager-component.html) per caricare i dati nel bucket S3
+ [SDK AWS per](https://aws.amazon.com/sdk-for-java/) Java, SDK [AWS JavaScript per o SDK](https://aws.amazon.com/sdk-for-javascript/) [AWS per Python (Boto3) per](https://docs.aws.amazon.com/pythonsdk/) eseguire APIs

**Limitazioni**
+ I dati in questo modello non vengono caricati in tempo reale nel bucket S3. Esiste un periodo di ritardo ed è possibile configurare il periodo di ritardo. I dati vengono temporaneamente memorizzati nel buffer nel dispositivo periferico e quindi caricati una volta scaduto il periodo.
+ L'SDK è disponibile solo in Java, Node.js e Python.

## Architecture
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-architecture"></a>

**Stack tecnologico Target**
+ Simple Storage Service (Amazon S3)
+ AWS IoT Greengrass
+ Broker MQTT
+ Componente Stream Manager

**Architettura Target**

Il diagramma seguente mostra un'architettura progettata per importare i dati dei sensori IoT e archiviarli in un bucket S3.

![\[Diagramma architetturale\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/images/pattern-img/b9032ae2-fffb-4750-b161-09810e19d878/images/8c28e639-5dcf-4950-b4a6-8015ec1a2894.png)


Il diagramma mostra il flusso di lavoro seguente:

1. Gli aggiornamenti di più sensori (ad esempio, temperatura e valvola) vengono pubblicati su un broker MQTT locale.

1. Il compressore di file Parquet sottoscritto a questi sensori aggiorna gli argomenti e riceve questi aggiornamenti.

1. Il compressore di file Parquet memorizza gli aggiornamenti localmente.

1. Trascorso il periodo, i file memorizzati vengono compressi in file Parquet e passati allo stream manager per essere caricati nel bucket S3 specificato.

1. Lo stream manager carica i file Parquet nel bucket S3.

**Nota**  
Lo stream manager (`StreamManager`) è un componente gestito. Per esempi di come esportare dati in Amazon S3, consulta [Stream manager](https://docs.aws.amazon.com/greengrass/v2/developerguide/stream-manager-component.html) nella documentazione di AWS IoT Greengrass. [Puoi utilizzare un broker MQTT locale come componente o un altro broker come Eclipse Mosquitto.](https://mosquitto.org/)

## Tools (Strumenti)
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-tools"></a>

**Strumenti AWS**
+ [Amazon Athena](https://docs.aws.amazon.com/athena/latest/ug/what-is.html) è un servizio di query interattivo che ti aiuta ad analizzare i dati direttamente in Amazon S3 utilizzando SQL standard.
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) è un servizio di archiviazione degli oggetti basato sul cloud che consente di archiviare, proteggere e recuperare qualsiasi quantità di dati.
+ [AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/v2/developerguide/what-is-iot-greengrass.html) è un servizio cloud e di runtime IoT edge open source che ti aiuta a creare, distribuire e gestire applicazioni IoT sui tuoi dispositivi.

**Altri strumenti**
+ [Apache Parquet](https://parquet.apache.org/) è un formato di file di dati open source orientato alle colonne progettato per l'archiviazione e il recupero.
+ [MQTT](https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html) (Message Queuing Telemetry Transport) è un protocollo di messaggistica leggero progettato per dispositivi con limitazioni.

## Best practice
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-best-practices"></a>

**Utilizza il formato di partizione corretto per i dati caricati**

Non ci sono requisiti specifici per i nomi dei prefissi root nel bucket S3 (ad esempio, `"myAwesomeDataSet/"` or`"dataFromSource"`), ma ti consigliamo di utilizzare una partizione e un prefisso significativi in modo che sia facile comprendere lo scopo del set di dati.

Ti consigliamo inoltre di utilizzare il giusto partizionamento in Amazon S3 in modo che le query vengano eseguite in modo ottimale sul set di dati. Nell'esempio seguente, i dati vengono partizionati in formato HIVE in modo da ottimizzare la quantità di dati analizzati da ciascuna query Athena. Ciò migliora le prestazioni e riduce i costi.

`s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet`

## Epiche
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-epics"></a>

### Configurare l'ambiente
<a name="set-up-your-environment"></a>


| Operazione | Description | Competenze richieste | 
| --- | --- | --- | 
| Crea un bucket S3. | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | Sviluppatore di app | 
| Aggiungi le autorizzazioni IAM al bucket S3. | Per concedere agli utenti l'accesso in scrittura al bucket e al prefisso S3 che hai creato in precedenza, aggiungi la seguente policy IAM al tuo ruolo AWS IoT Greengrass:<pre>{<br />    "Version": "2012-10-17",		 	 	 <br />    "Statement": [<br />        {<br />            "Sid": "S3DataUpload",<br />            "Effect": "Allow",<br />            "Action": [<br />                "s3:List*",<br />                "s3:Put*"<br />            ],<br />            "Resource": [<br />                "arn:aws:s3:::<ingestionBucket>",<br />                "arn:aws:s3:::<ingestionBucket>/<prefix>/*"<br />            ]<br />        }<br />    ]<br />}</pre>Per ulteriori informazioni, consulta [Creazione di una policy IAM per accedere alle risorse di Amazon S3 nella documentazione](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Integrating.Authorizing.IAM.S3CreatePolicy.html) di Aurora.[Successivamente, aggiorna la policy delle risorse (se necessario) per il bucket S3 per consentire l'accesso in scrittura con i principali AWS corretti.](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-bucket-user-policy-specifying-principal-intro.html) | Sviluppatore di app | 

### Crea e distribuisci il componente AWS IoT Greengrass
<a name="build-and-deploy-the-aws-iot-greengrass-component"></a>


| Operazione | Description | Competenze richieste | 
| --- | --- | --- | 
| Aggiorna la ricetta del componente. | [Aggiorna la configurazione del componente](https://docs.aws.amazon.com/greengrass/v2/developerguide/update-component-configurations.html) quando [crei una distribuzione](https://docs.aws.amazon.com/greengrass/v2/developerguide/create-deployments.html) in base al seguente esempio:<pre>{<br />  "region": "<region>",<br />  "parquet_period": <period>,<br />  "s3_bucket": "<s3Bucket>",<br />  "s3_key_prefix": "<s3prefix>"<br />}</pre>Sostituisci `<region>` con la tua regione AWS, `<period>` con il tuo intervallo periodico, `<s3Bucket>` con il tuo bucket S3 e `<s3prefix>` con il tuo prefisso. | Sviluppatore di app | 
| Crea il componente. | Esegui una delle seguenti operazioni:[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | Sviluppatore di app | 
| Aggiornate il client MQTT. | Il codice di esempio non utilizza l'autenticazione perché il componente si connette localmente al broker. Se lo scenario è diverso, aggiorna la sezione client MQTT secondo necessità. Inoltre, effettuate le seguenti operazioni:[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | Sviluppatore di app | 

### Aggiungi il componente al dispositivo core AWS IoT Greengrass versione 2
<a name="add-the-component-to-the-aws-iot-greengrass-version-2-core-device"></a>


| Operazione | Description | Competenze richieste | 
| --- | --- | --- | 
| Aggiorna la distribuzione del dispositivo principale. | Se la distribuzione del dispositivo core AWS IoT Greengrass versione 2 esiste già, [rivedi](https://docs.aws.amazon.com/greengrass/v2/developerguide/revise-deployments.html) la distribuzione. Se la distribuzione non esiste, [crea una nuova](https://docs.aws.amazon.com/greengrass/v2/developerguide/create-deployments.html) distribuzione.Per assegnare al componente il nome corretto, [aggiorna la configurazione del gestore dei registri](https://docs.aws.amazon.com/greengrass/v2/developerguide/log-manager-component.html) per il nuovo componente (se necessario) in base a quanto segue:<pre>{<br />  "logsUploaderConfiguration": {<br />    "systemLogsConfiguration": {<br />    ...<br />    },<br />    "componentLogsConfigurationMap": {<br />      "<com.iot.ingest.parquet>": {<br />        "minimumLogLevel": "INFO",<br />        "diskSpaceLimit": "20",<br />        "diskSpaceLimitUnit": "MB",<br />        "deleteLogFileAfterCloudUpload": "false"<br />      }<br />      ...<br />    }<br />  },<br />  "periodicUploadIntervalSec": "300"<br />}</pre>Infine, completa la revisione della distribuzione per il tuo dispositivo principale AWS IoT Greengrass. | Sviluppatore di app | 

### Verifica l'inserimento dei dati nel bucket S3
<a name="verify-data-ingestion-into-the-s3-bucket"></a>


| Operazione | Description | Competenze richieste | 
| --- | --- | --- | 
| Controlla i log per il volume AWS IoT Greengrass. | Verifica quanto segue:[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | Sviluppatore di app | 
| Controlla il bucket S3. | Verifica se i dati vengono caricati nel bucket S3. Puoi vedere i file che vengono caricati in ogni momento.Puoi anche verificare se i dati vengono caricati nel bucket S3 interrogando i dati nella sezione successiva. | Sviluppatore di app | 

### Configurare l'interrogazione da Athena
<a name="set-up-querying-from-athena"></a>


| Operazione | Description | Competenze richieste | 
| --- | --- | --- | 
| Crea un database e una tabella. | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | Sviluppatore di app | 
| Concedi ad Athena l'accesso ai dati. | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | Sviluppatore di app | 

## risoluzione dei problemi
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-troubleshooting"></a>


| Problema | Soluzione | 
| --- | --- | 
| Il client MQTT non riesce a connettersi | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 
| Il client MQTT non riesce a sottoscrivere | Convalida le autorizzazioni sul broker MQTT. Se disponi di un broker MQTT di AWS, consulta broker [MQTT 3.1.1 (Moquette) e broker [MQTT 5 (EMQX](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-emqx-component.html))](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-moquette-component.html). | 
| I file Parquet non vengono creati | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 
| Gli oggetti non vengono caricati nel bucket S3 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 

## Risorse correlate
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-resources"></a>
+ [DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)(documentazione Pandas)
+ Documentazione [Apache Parquet (documentazione Parquet](https://parquet.apache.org/docs/))
+ [Sviluppa componenti AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/v2/developerguide/develop-greengrass-components.html) (Guida per sviluppatori AWS IoT Greengrass, versione 2)
+ [Distribuisci i componenti di AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/v2/developerguide/manage-deployments.html) sui dispositivi (AWS IoT Greengrass Developer Guide, versione 2)
+ [Interagisci con dispositivi IoT locali](https://docs.aws.amazon.com/greengrass/v2/developerguide/interact-with-local-iot-devices.html) (AWS IoT Greengrass Developer Guide, versione 2)
+ [Broker MQTT 3.1.1 (Moquette) (Guida](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-moquette-component.html) per sviluppatori AWS IoT Greengrass, versione 2)
+ [Broker MQTT 5 (EMQX) (Guida](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-emqx-component.html) per sviluppatori AWS IoT Greengrass, versione 2)

## Informazioni aggiuntive
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-additional"></a>

**Analisi dei costi**

Il seguente scenario di analisi dei costi dimostra come l'approccio di ingestione dei dati coperto da questo modello può influire sui costi di inserimento dei dati nel cloud AWS. Gli esempi di prezzo in questo scenario si basano sui prezzi al momento della pubblicazione. I prezzi sono soggetti a modifiche. Inoltre, i costi possono variare in base alla regione AWS, alle quote di servizio AWS e ad altri fattori correlati all'ambiente cloud.

*Set di segnali di ingresso*

Questa analisi utilizza il seguente set di segnali di ingresso come base per confrontare i costi di ingestione dell'IoT con altre alternative disponibili.


| 
| 
| Numero di segnali | Frequenza | Dati per segnale | 
| --- |--- |--- |
| 125 | 25 Hz | 8 byte | 

In questo scenario, il sistema riceve 125 segnali. Ogni segnale è di 8 byte e si verifica ogni 40 millisecondi (25 Hz). Questi segnali possono provenire singolarmente o raggruppati in un payload comune. Hai la possibilità di dividere e comprimere questi segnali in base alle tue esigenze. Puoi anche determinare la latenza. La latenza è il periodo di tempo necessario per ricevere, accumulare e importare i dati.

A scopo di confronto, l'operazione di importazione per questo scenario si basa nella regione `us-east-1` AWS. Il confronto dei costi si applica solo ai servizi AWS. Altri costi, come l'hardware o la connettività, non vengono presi in considerazione nell'analisi.

*Confronti dei costi*

La tabella seguente mostra il costo mensile in dollari USA (USD) per ogni metodo di ingestione.


| 
| 
| Metodo | Costo mensile | 
| --- |--- |
| AWS SiteWise *IoT\$1* | 331,77 DOLLARI | 
| AWS IoT SiteWise Edge con pacchetto di elaborazione dati (mantenimento di tutti i dati all'edge) | 200 DOLLARI | 
| Regole di AWS IoT Core e Amazon S3 per l'accesso ai dati grezzi | 84,54 DOLLARI | 
| Compressione dei file Parquet a livello periferico e caricamento su Amazon S3 | 0,5 DOLLARI | 

\$1I dati devono essere sottoposti a downsampling per rispettare le quote di servizio. Ciò significa che si verifica una perdita di dati con questo metodo.

*Metodi alternativi*

Questa sezione mostra i costi equivalenti per i seguenti metodi alternativi:
+ **AWS IoT SiteWise**: ogni segnale deve essere caricato in un messaggio individuale. Pertanto, il numero totale di messaggi al mese è di 125 × 25 × 3600 × 24 × 30, ovvero 8,1 miliardi di messaggi al mese. Tuttavia, AWS IoT SiteWise può gestire solo 10 punti dati al secondo per proprietà. Supponendo che i dati vengano sottoposti a downsampling a 10 Hz, il numero di messaggi al mese viene ridotto a 125×10×3600×24×30, ovvero 3,24 miliardi. Se utilizzi il componente Publisher che raggruppa le misurazioni in gruppi di 10 (a 1 USD per milione di messaggi), ottieni un costo mensile di 324 USD al mese. Supponendo che ogni messaggio sia di 8 byte (1 Kb/125), si tratta di 25,92 GB di spazio di archiviazione dati. Ciò aggiunge un costo mensile di 7,77 USD al mese. Il costo totale per il primo mese è di 331,77 USD e aumenta di 7,77 USD ogni mese.
+ **AWS IoT SiteWise Edge con pacchetto di elaborazione dati, inclusi tutti i modelli e i segnali completamente elaborati sull'edge (ovvero, nessuna ingestione nel cloud)**: puoi utilizzare il pacchetto di elaborazione dati come alternativa per ridurre i costi e configurare tutti i modelli che vengono calcolati all'edge. Questo può funzionare solo per l'archiviazione e la visualizzazione, anche se non viene eseguito alcun calcolo reale. In questo caso, è necessario utilizzare un hardware potente per l'edge gateway. C'è un costo fisso di 200 USD al mese.
+ **Inserimento diretto in AWS IoT Core tramite MQTT e una regola IoT per archiviare i dati grezzi in Amazon S3** — Supponendo che tutti i segnali siano pubblicati in un payload comune, il numero totale di messaggi pubblicati su AWS IoT Core è di 25×3600×24×30, ovvero 64,8 milioni al mese. A 1 USD per milione di messaggi, si tratta di un costo mensile di 64,8 USD al mese. Con 0,15 USD per milione di attivazioni di regole e con una regola per messaggio, si aggiunge un costo mensile di 19,44 USD al mese. Al costo di 0,023 USD per Gb di storage in Amazon S3, ciò aggiunge altri 1,5 USD al mese (in aumento ogni mese per riflettere i nuovi dati). Il costo totale per il primo mese è di 84,54 USD e aumenta di 1,5 USD ogni mese.
+ Compressione **dei dati all'edge in un file Parquet e caricamento su Amazon S3 (metodo proposto**): il rapporto di compressione dipende dal tipo di dati. Con gli stessi dati industriali testati per MQTT, i dati di output totali per un mese intero sono 1,2 Gb. Questo costa 0,03 USD al mese. I rapporti di compressione (utilizzando dati casuali) descritti in altri benchmark sono dell'ordine del 66 percento (più vicini allo scenario peggiore). Il totale dei dati è di 21 Gb e costa 0,5 USD al mese.

**Generatore di file Parquet**

Il seguente esempio di codice mostra la struttura di un generatore di file Parquet scritto in Python. L'esempio di codice è solo a scopo illustrativo e non funzionerà se incollato nel tuo ambiente.

```
import queue
import paho.mqtt.client as mqtt
import pandas as pd

#queue for decoupling the MQTT thread
messageQueue = queue.Queue()
client = mqtt.Client()
streammanager = StreamManagerClient()

def feederListener(topic, message):
    payload = {
        "topic" : topic,
        "payload" : message,
    }
    messageQueue.put_nowait(payload)

def on_connect(client_instance, userdata, flags, rc):
    client.subscribe("#",qos=0)

def on_message(client, userdata, message):
    feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8")))


filename = "tempfile.parquet"
streamname = "mystream"
destination_bucket= "amzn-s3-demo-bucket"
keyname="mykey"
period= 60

client.on_connect = on_connect
client.on_message = on_message
streammanager.create_message_stream(
            MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData)
        )


while True:
   try:
       message = messageQueue.get(timeout=myArgs.mqtt_timeout)
   except (queue.Empty):
       logger.warning("MQTT message reception timed out")

   currentTimestamp = getCurrentTime()
   if currentTimestamp >= nextUploadTimestamp:
       df = pd.DataFrame.from_dict(accumulator)
       df.to_parquet(filename)
       s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name)
       streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
       accumulator = {}
       nextUploadTimestamp += period
   else:
        accumulator.append(message)
```