翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
AWS IoT Core MQTT メッセージを発行/サブスクライブする
AWS IoT Core MQTT メッセージング IPC サービスを使用すると、 との間で MQTT メッセージを送受信できます AWS IoT Core。コンポーネントは、 にメッセージを発行 AWS IoT Core し、トピックをサブスクライブして、他のソースからの MQTT メッセージに対応できます。MQTT の AWS IoT Core 実装の詳細については、「 AWS IoT Core デベロッパーガイド」の「MQTT」を参照してください。
注記
この MQTT メッセージング IPC サービスを使用すると、 とメッセージを交換できます AWS IoT Core。コンポーネント間でメッセージを交換する方法の詳細については、「ローカルメッセージをパブリッシュ/サブスクライブする」を参照してください。
最小 SDK バージョン
次の表 AWS IoT Device SDK に、MQTT メッセージの発行とサブスクライブに使用する必要がある の最小バージョンを示します AWS IoT Core。
| SDK | 最小バージョン |
|---|---|
|
v1.2.10 |
|
|
v1.5.3 |
|
|
v1.17.0 |
|
|
v1.12.0 |
Authorization
カスタムコンポーネントで AWS IoT Core MQTT メッセージングを使用するには、コンポーネントがトピックに関するメッセージを送受信できるようにする承認ポリシーを定義する必要があります。承認ポリシーの定義については、「コンポーネントに IPC オペレーションの実行を許可する」を参照してください。
AWS IoT Core MQTT メッセージングの承認ポリシーには、次のプロパティがあります。
IPC サービス識別子: aws.greengrass.ipc.mqttproxy
| 運用 | 説明 | リソース |
|---|---|---|
|
|
コンポーネントが、指定した MQTT トピック AWS IoT Core で にメッセージを発行できるようにします。 |
|
|
|
コンポーネントが、指定したトピック AWS IoT Core で からのメッセージをサブスクライブできるようにします。 |
|
|
|
コンポーネントが、指定したトピックの AWS IoT Core MQTT メッセージを発行およびサブスクライブできるようにします。 |
|
MQTT 認可ポリシーの AWS IoT Core MQTT ワイルドカード
MQTT IPC 認可ポリシーで AWS IoT Core MQTT ワイルドカードを使用できます。これによりコンポーネントは、承認ポリシーで許可するトピックフィルターに一致するトピックを発行およびサブスクライブできます。例えば、コンポーネントの承認ポリシーで test/topic/# へのアクセス権が付与されている場合、コンポーネントは test/topic/# をサブスクライブでき、test/topic/filter を発行およびサブスクライブできます。
AWS IoT Core MQTT 認可ポリシーのレシピ変数
v2.6.0 以降の Greengrass nucleus を使用している場合、承認ポリシーの {iot:thingName} recipe 変数を使用できます。この機能を使用すると、コアデバイスのグループに対して 1 つの承認ポリシーを設定できます。各コアデバイスは自身の名前を含むトピックにのみアクセスできます。例えば、コンポーネントに次のトピックリソースへのアクセスを許可できます。
devices/{iot:thingName}/messages
詳細については、「レシピ変数」および「マージ更新で recipe 変数を使用する」を参照してください。
承認ポリシーの例
次の承認ポリシーの例を参照して、コンポーネントの承認ポリシー設定の参考にできます。
例アクセスが制限されていない承認ポリシーの例
以下の承認ポリシーの例では、コンポーネントがすべてのトピックを公開およびサブスクライブすることを許可します。
例アクセスが制限されている承認ポリシーの例
次の承認ポリシーの例では、コンポーネントが、factory/1/events および factory/1/actions という名前の 2 つのトピックを公開およびサブスクライブすることを許可します。
例コアデバイスのグループに対する承認ポリシーの例
重要
この例では、v2.6.0 以降の Greengrass nucleus コンポーネントで利用できる機能を使用しています。Greengrass nucleus v2.6.0 では、コンポーネント設定に、ほとんどの recipe 変数 ({iot:thingName} など) のサポートが追加されました。
次の承認ポリシーの例は、コンポーネントが実行されているコアデバイスの名前を含むトピックを、コンポーネントが発行およびサブスクライブできるようにします。
PublishToIoTCore
トピック AWS IoT Core で MQTT メッセージを に発行します。
MQTT メッセージを に発行する場合 AWS IoT Core、1 秒あたり 100 トランザクションのクォータがあります。このクォータを超えると、メッセージは Greengrass デバイスでの処理のためにキューに入れられます。また、1 秒あたり 512 KB のデータクォータと、1 秒あたり 20,000 パブリッシュ (一部は 2,000) のアカウント全体のクォータがあります AWS リージョン。 AWS IoT Core の MQTT メッセージブローカー制限の詳細については、「AWS IoT Core メッセージブローカーとプロトコルの制限とクォータ」を参照してください。
これらのクォータを超えると、Greengrass デバイスはメッセージの発行を制限します AWS IoT Core。メッセージはメモリ内のスプーラに保存されます。デフォルトでは、スプーラに割り当てられるメモリは 2.5 MB です。スプーラがいっぱいになると、新しいメッセージは拒否されます。スプーラのサイズを増やすことができます。詳細については、Greengrass nucleus ドキュメントの「設定」を参照してください。スプーラがいっぱいになり、割り当てられるメモリを増やす必要がないように、公開要求は 1 秒あたり 100 要求以下に制限してください。
アプリケーションがメッセージをより高いレートで送信したり、より大きなメッセージを送信したりする必要がある場合は、ストリームマネージャー を使用して Kinesis データストリームにメッセージを送信することを検討してください。ストリームマネージャーコンポーネントは、大量のデータを AWS クラウド に転送するように設計されています。詳細については、「Greengrass コアデバイスでのデータストリームの管理」を参照してください。
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topicName(Python:topic_name)-
メッセージの発行先として指定するトピック。
qos-
使用する MQTT QoS。この列挙型 (
QOS) には以下の値があります。-
AT_MOST_ONCE– QoS 0。MQTT メッセージが配信されるのは 1 回以下です。 -
AT_LEAST_ONCE– QoS 1。MQTT メッセージが配信されるのは 1 回以上です。
-
payload-
(オプション) BLOB としてのメッセージペイロード。
MQTT 5 を使用する際、Greengrass nucleus の v2.10.0 以降で以下の機能が使用できます。MQTT 3.1.1 を使用している場合、これらの機能は無視されます。次の表に、これらの機能にアクセスするために使用する必要がある AWS IoT デバイス SDK の最小バージョンを示します。
| SDK | 最小バージョン |
|---|---|
| AWS IoT Device SDK for Python |
v1.15.0 |
| AWS IoT Device SDK for Java |
v1.13.0 |
| AWS IoT Device SDK for C++ |
v1.24.0 |
| AWS IoT Device SDK for JavaScript |
v1.13.0 |
payloadFormat-
(オプション) メッセージペイロードのフォーマット。
payloadFormatを設定しない場合、タイプはBYTESとみなされます。この列挙型には以下の値があります。-
BYTES— ペイロードのコンテンツは、バイナリ BLOB です。 -
UTF8— ペイロードのコンテンツは UTF8 の文字列です。
-
retain-
(オプション) 発行時に MQTT 保持オプションを
trueに設定するか否かを示します。 userProperties-
(オプション) 送信するアプリケーション固有の
UserPropertyオブジェクトのリストです。UserPropertyオブジェクトは次のように定義されます。UserProperty: key: string value: string messageExpiryIntervalSeconds-
(オプション) メッセージが期限切れとなり、サーバーによって削除されるまでの秒数です。この値を設定しない場合、メッセージに有効期限は設定されません。
correlationData-
(オプション) リクエストに付加される情報で、リクエストとレスポンスの関連付けに使用できます。
responseTopic-
(オプション) レスポンスメッセージに使用するトピックです。
contentType-
(オプション) メッセージのコンテンツタイプを示すアプリケーション固有の識別子です。
応答
このオペレーションはレスポンスで一切の情報を提供しません。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
SubscribeToIoTCore
トピックまたはトピックフィルター AWS IoT Core で から MQTT メッセージをサブスクライブします。 AWS IoT Greengrass Core ソフトウェアは、コンポーネントがライフサイクルの終了に達するとサブスクリプションを削除します。
このオペレーションはサブスクリプションオペレーションで、イベントメッセージのストリームをサブスクライブするというものです。このオペレーションを使用するには、イベントメッセージ、エラー、およびストリームクロージャを処理する関数を使用して、ストリームレスポンスハンドラーを定義します。(詳細については、IPC イベントストリームへのサブスクライブ を参照してください)。
イベントメッセージの種類: IoTCoreMessage
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topicName(Python:topic_name)-
サブスクライブ先のトピック。MQTT トピックのワイルドカード (
#および+) を使用して、複数のトピックにサブスクライブできます。 qos-
使用する MQTT QoS。この列挙型 (
QOS) には以下の値があります。-
AT_MOST_ONCE– QoS 0。MQTT メッセージが配信されるのは 1 回以下です。 -
AT_LEAST_ONCE– QoS 1。MQTT メッセージが配信されるのは 1 回以上です。
-
応答
このオペレーションのレスポンスには以下の情報が含まれます。
messages-
MQTT メッセージのストリーム。このオブジェクト (
IoTCoreMessage) には、次の情報が含まれます。message-
MQTT メッセージ。このオブジェクト (
MQTTMessage) には、次の情報が含まれます。topicName(Python:topic_name)-
メッセージが発行されたトピック。
payload-
(オプション) BLOB としてのメッセージペイロード。
MQTT 5 を使用する際、Greengrass nucleus の v2.10.0 以降で以下の機能が使用できます。MQTT 3.1.1 を使用している場合、これらの機能は無視されます。次の表に、これらの機能にアクセスするために使用する必要がある AWS IoT デバイス SDK の最小バージョンを示します。
SDK 最小バージョン AWS IoT Device SDK for Python v2 v1.15.0 AWS IoT Device SDK for Java v2 v1.13.0 AWS IoT Device SDK for C++ v2 v1.24.0 AWS IoT Device SDK for JavaScript v2 v1.13.0 payloadFormat-
(オプション) メッセージペイロードのフォーマット。
payloadFormatを設定しない場合、タイプはBYTESとみなされます。この列挙型には以下の値があります。-
BYTES— ペイロードのコンテンツは、バイナリ BLOB です。 -
UTF8— ペイロードのコンテンツは UTF8 の文字列です。
-
retain-
(オプション) 発行時に MQTT 保持オプションを
trueに設定するか否かを示します。 userProperties-
(オプション) 送信するアプリケーション固有の
UserPropertyオブジェクトのリストです。UserPropertyオブジェクトは次のように定義されます。UserProperty: key: string value: string messageExpiryIntervalSeconds-
(オプション) メッセージが期限切れとなり、サーバーによって削除されるまでの秒数です。この値を設定しない場合、メッセージに有効期限は設定されません。
correlationData-
(オプション) リクエストに付加される情報で、リクエストとレスポンスの関連付けに使用できます。
responseTopic-
(オプション) レスポンスメッセージに使用するトピックです。
contentType-
(オプション)メッセージのコンテンツタイプのアプリケーション固有の識別子です。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
例
コンポーネントで AWS IoT Core MQTT IPC サービスを使用する方法については、次の例を参照してください。
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
次の C++ アプリケーションの例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージを発行する方法を示しています AWS IoT Core。
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
次の C++ アプリケーションの例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージをサブスクライブする方法を示しています AWS IoT Core。
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherRust:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherRust/1.0.0/publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
次の Rust アプリケーション例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージを発行する方法を示しています AWS IoT Core。
use gg_sdk::{Qos, Sdk}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let message = b"Hello, World"; let topic = "my/topic"; let qos = Qos::AtLeastOnce; sdk.publish_to_iot_core(topic, message, qos) .expect("Failed to publish to topic"); println!("Successfully published to topic: {topic}"); }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberRust:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberRust/1.0.0/subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
次の Rust アプリケーションの例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージをサブスクライブする方法を示しています AWS IoT Core。
use gg_sdk::{Qos, Sdk}; use std::{thread, time::Duration}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let topic = "my/topic"; let qos = Qos::AtLeastOnce; let callback = |topic: &str, payload: &[u8]| { let message = String::from_utf8_lossy(payload); println!("Received new message on topic {topic}: {message}"); }; let _sub = sdk .subscribe_to_iot_core(topic, qos, &callback) .expect("Failed to subscribe to topic"); println!("Successfully subscribed to topic: {topic}"); // Keep the main thread alive, or the process will exit. loop { thread::sleep(Duration::from_secs(10)); } }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherC:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherC/1.0.0/sample_publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C アプリケーション例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージを発行する方法を示しています AWS IoT Core。
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <stdio.h> #include <stdlib.h> int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer message = GG_STR("Hello, World"); GgBuffer topic = GG_STR("my/topic"); uint8_t qos = 1; err = ggipc_publish_to_iot_core(topic, message, qos); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to publish to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully published to topic: %.*s\n", (int) topic.len, topic.data ); }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberC:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberC/1.0.0/sample_subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C アプリケーション例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージをサブスクライブする方法を示しています AWS IoT Core。
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> static void on_subscription_response( void *ctx, GgBuffer topic, GgBuffer payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) handle; printf( "Received new message on topic %.*s: %.*s\n", (int) topic.len, topic.data, (int) payload.len, payload.data ); } int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer topic = GG_STR("my/topic"); uint8_t qos = 1; GgIpcSubscriptionHandle handle; err = ggipc_subscribe_to_iot_core( topic, qos, on_subscription_response, NULL, &handle ); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to subscribe to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data ); // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } // To stop subscribing, close the subscription handle. ggipc_close_subscription(handle); }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/sample_cpp_publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C++ アプリケーションの例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージを発行する方法を示しています AWS IoT Core。
#include <gg/ipc/client.hpp> #include <iostream> int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view message = "Hello, World"; std::string_view topic = "my/topic"; uint8_t qos = 1; error = client.publish_to_iot_core(topic, message, qos); if (error) { std::cerr << "Failed to publish to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully published to topic: " << topic << "\n"; }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/sample_cpp_subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C++ アプリケーションの例は、 AWS IoT Core MQTT IPC サービスを使用してメッセージをサブスクライブする方法を示しています AWS IoT Core。
#include <gg/ipc/client.hpp> #include <unistd.h> #include <iostream> class ResponseHandler : public gg::ipc::IotTopicCallback { void operator()( std::string_view topic, gg::Buffer payload, gg::ipc::Subscription &handle ) override { (void) handle; std::cout << "Received new message on topic " << topic << ": " << payload << "\n"; } }; int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view topic = "my/topic"; uint8_t qos = 1; static ResponseHandler handler; error = client.subscribe_to_iot_core(topic, qos, handler); if (error) { std::cerr << "Failed to subscribe to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully subscribed to topic: " << topic << "\n"; // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } }