

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 AWS IoT Greengrass，以經濟實惠的方式直接將 IoT 資料擷取至 Amazon S3 AWS IoT
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass"></a>

*Sebastian Viviani 和 Rizwan Syed，Amazon Web Services*

## 總結
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-summary"></a>

此模式說明如何使用 AWS IoT Greengrass 第 2 版裝置，以經濟實惠的方式將物聯網 (IoT) 資料直接擷取至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。裝置會執行自訂元件來讀取 IoT 資料，並將資料儲存在持久性儲存體 （即本機磁碟區） 中。然後，裝置會將 IoT 資料壓縮為 Apache Parquet 檔案，並定期將資料上傳至 S3 儲存貯體。

您擷取的 IoT 資料數量和速度受限於您的邊緣硬體功能和網路頻寬。您可以使用 Amazon Athena 以經濟實惠的方式分析擷取的資料。Athena 使用 [Amazon Managed Grafana ](https://docs.aws.amazon.com/grafana/latest/userguide/what-is-Amazon-Managed-Service-Grafana.html)支援壓縮的 Apache Parquet 檔案和資料視覺化。

## 先決條件和限制
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-prereqs"></a>

**先決條件**
+ 作用中的 AWS 帳戶
+ 在 [AWS IoT Greengrass 第 2 版](https://docs.aws.amazon.com/greengrass/v2/developerguide/greengrass-v2-whats-new.html)上執行並從感應器收集資料的[邊緣閘道](https://docs.aws.amazon.com/greengrass/v1/developerguide/quick-start.html) （資料來源和資料收集程序超出此模式的範圍，但您可以使用幾乎任何類型的感應器資料。 此模式使用本機 [MQTT](https://mqtt.org/) 代理程式搭配可在本機發佈資料的感應器或閘道。)
+ AWS IoT Greengrass [元件](https://docs.aws.amazon.com/greengrass/v2/developerguide/develop-greengrass-components.html)、[角色](https://docs.aws.amazon.com/greengrass/v1/developerguide/service-role.html)和 [SDK 相依性](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#installation)
+ 將資料上傳至 S3 儲存貯體的[串流管理員元件](https://docs.aws.amazon.com/greengrass/v2/developerguide/stream-manager-component.html) 
+ [適用於 Java 的 AWS 開發套件](https://aws.amazon.com/sdk-for-java/)、[適用於 JavaScript 的 AWS 開發套件](https://aws.amazon.com/sdk-for-javascript/)或[適用於 Python 的 AWS 開發套件 (Boto3)](https://docs.aws.amazon.com/pythonsdk/) APIs

**限制**
+ 此模式中的資料不會即時上傳至 S3 儲存貯體。有延遲期間，您可以設定延遲期間。資料會在邊緣裝置中暫時緩衝，然後在期間過期後上傳。
+ 開發套件僅適用於 Java、Node.js 和 Python。

## Architecture
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-architecture"></a>

**目標技術堆疊**
+ Amazon S3
+ AWS IoT Greengrass
+ MQTT 代理程式
+ 串流管理員元件

**目標架構**

下圖顯示設計用來擷取 IoT 感應器資料的架構，並將該資料存放在 S3 儲存貯體中。

![\[架構圖\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/images/pattern-img/b9032ae2-fffb-4750-b161-09810e19d878/images/8c28e639-5dcf-4950-b4a6-8015ec1a2894.png)


該圖顯示以下工作流程：

1. 多個感應器 （例如，溫度和閥） 更新會發佈至本機 MQTT 代理程式。

1. 訂閱這些感應器的 Parquet 檔案壓縮器會更新主題並接收這些更新。

1. Parquet 檔案壓縮器會在本機存放更新。

1. 期間經過之後，儲存的檔案會壓縮為 Parquet 檔案，並傳遞給串流管理員，以上傳到指定的 S3 儲存貯體。

1. 串流管理員會將 Parquet 檔案上傳至 S3 儲存貯體。

**注意**  
串流管理員 (`StreamManager`) 是受管元件。如需如何將資料匯出至 Amazon S3 的範例，請參閱 AWS IoT Greengrass 文件中的[串流管理員](https://docs.aws.amazon.com/greengrass/v2/developerguide/stream-manager-component.html)。您可以使用本機 MQTT 代理程式做為元件或其他代理程式，例如 [Eclipse Mosquitto](https://mosquitto.org/)。

## 工具
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-tools"></a>

**AWS 工具**
+ [Amazon Athena](https://docs.aws.amazon.com/athena/latest/ug/what-is.html) 是一種互動式查詢服務，可協助您使用標準 SQL 直接在 Amazon S3 中分析資料。
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) 是一種雲端型物件儲存服務，可協助您儲存、保護和擷取任何數量的資料。
+ [AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/v2/developerguide/what-is-iot-greengrass.html) 是一種開放原始碼 IoT 邊緣執行期和雲端服務，可協助您在裝置上建置、部署和管理 IoT 應用程式。

**其他工具**
+ [Apache Parquet](https://parquet.apache.org/) 是一種開放原始碼資料欄導向的資料檔案格式，專為儲存和擷取而設計。
+ [MQTT](https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html) （訊息佇列遙測傳輸） 是一種輕量型傳訊通訊協定，專為受限裝置而設計。

## 最佳實務
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-best-practices"></a>

**針對上傳的資料使用正確的分割區格式**

S3 儲存貯體 （例如 `"myAwesomeDataSet/"`或 `"dataFromSource"`) 中沒有根字首名稱的特定要求，但我們建議您使用有意義的分割區和字首，以便輕鬆了解資料集的目的。

我們也建議您在 Amazon S3 中使用正確的分割，以便在資料集上以最佳方式執行查詢。在下列範例中，資料會以 HIVE 格式分割，以便最佳化每個 Athena 查詢掃描的資料量。這可改善效能並降低成本。

`s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet`

## 史詩
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-epics"></a>

### 設定您的環境
<a name="set-up-your-environment"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立 S3 儲存貯體。 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 應用程式開發人員 | 
| 將 IAM 許可新增至 S3 儲存貯體。 | 若要授予使用者對您先前建立的 S3 儲存貯體和字首的寫入存取權，請將下列 IAM 政策新增至您的 AWS IoT Greengrass 角色：<pre>{<br />    "Version": "2012-10-17",		 	 	 <br />    "Statement": [<br />        {<br />            "Sid": "S3DataUpload",<br />            "Effect": "Allow",<br />            "Action": [<br />                "s3:List*",<br />                "s3:Put*"<br />            ],<br />            "Resource": [<br />                "arn:aws:s3:::<ingestionBucket>",<br />                "arn:aws:s3:::<ingestionBucket>/<prefix>/*"<br />            ]<br />        }<br />    ]<br />}</pre>如需詳細資訊，請參閱 Aurora 文件中的[建立 IAM 政策以存取 Amazon S3 資源](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Integrating.Authorizing.IAM.S3CreatePolicy.html)。接著，更新 S3 儲存貯體的資源政策 （如有需要），以允許使用正確的 AWS [主體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-bucket-user-policy-specifying-principal-intro.html)進行寫入存取。 | 應用程式開發人員 | 

### 建置和部署 AWS IoT Greengrass 元件
<a name="build-and-deploy-the-aws-iot-greengrass-component"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 更新 元件的配方。 | 根據下列範例[，在建立部署](https://docs.aws.amazon.com/greengrass/v2/developerguide/create-deployments.html)時[更新元件組態](https://docs.aws.amazon.com/greengrass/v2/developerguide/update-component-configurations.html)：<pre>{<br />  "region": "<region>",<br />  "parquet_period": <period>,<br />  "s3_bucket": "<s3Bucket>",<br />  "s3_key_prefix": "<s3prefix>"<br />}</pre>將 取代`<region>`為您的 AWS 區域、`<period>`將 取代為您的週期性間隔、`<s3Bucket>`將 取代為您的 S3 儲存貯體，並將 `<s3prefix>`取代為您的字首。 | 應用程式開發人員 | 
| 建立 元件。 | 執行以下任意一項：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 應用程式開發人員 | 
| 更新 MQTT 用戶端。 | 範例程式碼不使用身分驗證，因為元件會在本機連線至代理程式。如果您的案例不同，請視需要更新 MQTT 用戶端區段。此外，請執行下列動作：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 應用程式開發人員 | 

### 將元件新增至 AWS IoT Greengrass 第 2 版核心裝置
<a name="add-the-component-to-the-aws-iot-greengrass-version-2-core-device"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 更新核心裝置的部署。 | 如果 AWS IoT Greengrass 第 2 版核心裝置的部署已存在，[請修改部署](https://docs.aws.amazon.com/greengrass/v2/developerguide/revise-deployments.html)。如果部署不存在，[請建立新的部署](https://docs.aws.amazon.com/greengrass/v2/developerguide/create-deployments.html)。若要為元件提供正確的名稱，請根據下列項目更新新元件的[日誌管理員組態](https://docs.aws.amazon.com/greengrass/v2/developerguide/log-manager-component.html) （如果需要）：<pre>{<br />  "logsUploaderConfiguration": {<br />    "systemLogsConfiguration": {<br />    ...<br />    },<br />    "componentLogsConfigurationMap": {<br />      "<com.iot.ingest.parquet>": {<br />        "minimumLogLevel": "INFO",<br />        "diskSpaceLimit": "20",<br />        "diskSpaceLimitUnit": "MB",<br />        "deleteLogFileAfterCloudUpload": "false"<br />      }<br />      ...<br />    }<br />  },<br />  "periodicUploadIntervalSec": "300"<br />}</pre>最後，完成 AWS IoT Greengrass 核心裝置的部署修訂。 | 應用程式開發人員 | 

### 驗證資料擷取至 S3 儲存貯體
<a name="verify-data-ingestion-into-the-s3-bucket"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 檢查 AWS IoT Greengrass 磁碟區的日誌。 | 檢查以下各項：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 應用程式開發人員 | 
| 檢查 S3 儲存貯體。 | 驗證資料是否正在上傳到 S3 儲存貯體。您可以在每個期間看到正在上傳的檔案。您也可以在下一節中查詢資料，確認資料是否已上傳至 S3 儲存貯體。 | 應用程式開發人員 | 

### 從 Athena 設定查詢
<a name="set-up-querying-from-athena"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立資料庫和資料表。 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 應用程式開發人員 | 
| 授予 Athena 對資料的存取權。 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 應用程式開發人員 | 

## 疑難排解
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-troubleshooting"></a>


| 問題 | 解決方案 | 
| --- | --- | 
| MQTT 用戶端無法連線 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 
| MQTT 用戶端無法訂閱 | 驗證 MQTT 代理程式上的許可。如果您有來自 AWS 的 MQTT 代理程式，請參閱 [MQTT 3.1.1 代理程式 (Moquette)](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-moquette-component.html) 和 [MQTT 5 代理程式 (EMQX)](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-emqx-component.html)。 | 
| 不會建立 Parquet 檔案 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 
| 物件不會上傳到 S3 儲存貯體 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass.html) | 

## 相關資源
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-resources"></a>
+ [DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) (Pandas 文件）
+ [Apache Parquet 文件](https://parquet.apache.org/docs/) (Parquet 文件）
+ [開發 AWS IoT Greengrass 元件](https://docs.aws.amazon.com/greengrass/v2/developerguide/develop-greengrass-components.html) (AWS IoT Greengrass 開發人員指南，第 2 版）
+ 將[ AWS IoT Greengrass 元件部署至裝置](https://docs.aws.amazon.com/greengrass/v2/developerguide/manage-deployments.html) (AWS IoT Greengrass 開發人員指南，第 2 版）
+ [與本機 IoT 裝置互動 ](https://docs.aws.amazon.com/greengrass/v2/developerguide/interact-with-local-iot-devices.html)(AWS IoT Greengrass 開發人員指南，第 2 版）
+ [MQTT 3.1.1 代理程式 (Moquette)](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-moquette-component.html) (AWS IoT Greengrass 開發人員指南，第 2 版）
+ [MQTT 5 代理程式 (EMQX)](https://docs.aws.amazon.com/greengrass/v2/developerguide/mqtt-broker-emqx-component.html) (AWS IoT Greengrass 開發人員指南，第 2 版）

## 其他資訊
<a name="cost-effectively-ingest-iot-data-directly-into-amazon-s3-using-aws-iot-greengrass-additional"></a>

**成本分析**

下列成本分析案例示範此模式中涵蓋的資料擷取方法如何影響 AWS 雲端中的資料擷取成本。此案例中的定價範例是以發佈時的價格為基礎。價格可能變動。此外，您的成本可能會根據您的 AWS 區域、AWS 服務配額以及與雲端環境相關的其他因素而有所不同。

*輸入訊號集*

此分析使用下列一組輸入訊號，做為比較 IoT 擷取成本與其他可用替代方案的基礎。


| 
| 
| 訊號數量 | Frequency (頻率) | 每個訊號的資料 | 
| --- |--- |--- |
| 125 | 25 Hz | 8 位元組 | 

在此案例中，系統會接收 125 個訊號。每個訊號都是 8 個位元組，每 40 毫秒 (25 Hz) 就會發生。這些訊號可以個別出現，或在常見的承載中分組。您可以選擇根據您的需求分割和封裝這些訊號。您也可以判斷延遲。延遲包含接收、累積和擷取資料的期間。

為了比較，此案例的擷取操作是以 `us-east-1` AWS 區域為基礎。成本比較僅適用於 AWS 服務。硬體或連線能力等其他成本不會納入分析。

*成本比較*

下表顯示每個擷取方法的每月成本，以美元 (USD) 為單位。


| 
| 
| Method | 每月成本 | 
| --- |--- |
| AWS IoT SiteWise*\$1* | 331.77 USD | 
| AWS IoT SiteWise Edge 搭配資料處理套件 （將所有資料保留在邊緣） | 200 USD | 
| 用於存取原始資料的 AWS IoT Core 和 Amazon S3 規則 | 84.54 USD | 
| Parquet 檔案在邊緣壓縮並上傳至 Amazon S3 | 0.5 USD | 

\$1資料必須進行縮減取樣，以符合服務配額。這表示此方法有一些資料遺失。

*替代方法*

本節顯示下列替代方法的同等成本：
+ **AWS IoT SiteWise** – 每個訊號都必須以個別訊息上傳。因此，每月的訊息總數為 125 × 25 × 3600 × 24 × 30，或每月 81 億則訊息。不過，AWS IoT SiteWise 每個屬性每秒只能處理 10 個資料點。假設資料縮減取樣至 10 Hz，則每月訊息數量會減少至 125 × 10 × 3600 × 24 × 30，或 32.4 億。如果您使用 發佈者元件，以 10 個 （每百萬則訊息 1 USD) 的群組來封裝衡量值，則每月 324 USD 的費用。假設每則訊息為 8 個位元組 (1 Kb/125)，即 25.92 Gb 的資料儲存體。這會增加每月 7.77 USD 的成本。第一個月的總成本為 331.77 USD，每月增加 7.77 USD。
+ **AWS IoT SiteWise Edge 搭配資料處理套件，包括在邊緣完全處理的所有模型和訊號 （即沒有雲端擷取）** – 您可以使用資料處理套件做為替代方案，以降低成本並設定在邊緣計算的所有模型。即使未執行實際計算，這仍僅適用於儲存體和視覺化。在此情況下，邊緣閘道必須使用功能強大的硬體。每月固定費用為 200 USD。
+ **MQTT 直接擷取至 AWS IoT Core 和 IoT 規則，以將原始資料儲存在 Amazon S3 中** – 假設所有訊號都發佈在通用承載中，發佈至 AWS IoT Core 的訊息總數為每月 25×3600×24×30 或 6480 萬。每百萬則訊息 1 USD，即每月 64.8 USD 的成本。每百萬規則啟用 0.15 USD，每則訊息一個規則，每月增加 19.44 USD。在 Amazon S3 中，每 Gb 儲存的成本為 0.023 USD，每月增加 1.5 USD （每月增加以反映新資料）。第一個月的總成本為 84.54 USD，每月增加 1.5 USD。
+ **在 Parquet 檔案中的邊緣壓縮資料並上傳至 Amazon S3 （建議的方法**) – 壓縮率取決於資料類型。使用針對 MQTT 測試的相同工業資料，整個月的總輸出資料為 1.2 Gb。此費用為每月 0.03 USD。其他基準測試中描述的壓縮率 （使用隨機資料） 的順序為 66% （較接近最壞情況）。總資料為 21 Gb，每月費用為 0.5 USD。

**Parquet 檔案產生器**

下列程式碼範例顯示以 Python 撰寫的 Parquet 檔案產生器結構。程式碼範例僅供說明之用，如果貼入您的環境，則無法運作。

```
import queue
import paho.mqtt.client as mqtt
import pandas as pd

#queue for decoupling the MQTT thread
messageQueue = queue.Queue()
client = mqtt.Client()
streammanager = StreamManagerClient()

def feederListener(topic, message):
    payload = {
        "topic" : topic,
        "payload" : message,
    }
    messageQueue.put_nowait(payload)

def on_connect(client_instance, userdata, flags, rc):
    client.subscribe("#",qos=0)

def on_message(client, userdata, message):
    feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8")))


filename = "tempfile.parquet"
streamname = "mystream"
destination_bucket= "amzn-s3-demo-bucket"
keyname="mykey"
period= 60

client.on_connect = on_connect
client.on_message = on_message
streammanager.create_message_stream(
            MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData)
        )


while True:
   try:
       message = messageQueue.get(timeout=myArgs.mqtt_timeout)
   except (queue.Empty):
       logger.warning("MQTT message reception timed out")

   currentTimestamp = getCurrentTime()
   if currentTimestamp >= nextUploadTimestamp:
       df = pd.DataFrame.from_dict(accumulator)
       df.to_parquet(filename)
       s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name)
       streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
       accumulator = {}
       nextUploadTimestamp += period
   else:
        accumulator.append(message)
```