翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
ローカルメッセージをパブリッシュ/サブスクライブする
パブリッシュ/サブスクライブ (pubsub) メッセージを使用すると、トピックにメッセージを送受信できます。コンポーネントはメッセージをトピックに発行して、他のコンポーネントにメッセージを送信できます。その後、そのトピックをサブスクライブしているコンポーネントは、受信したメッセージに対応できます。
注記
このパブリッシュ/サブスクライブ IPC サービスを使用して、 AWS IoT Core MQTT を公開またはサブスクライブすることはできません。 AWS IoT Core MQTT とメッセージを交換する方法の詳細については、「」を参照してくださいAWS IoT Core MQTT メッセージを発行/サブスクライブする。
最小 SDK バージョン
次の表に、ローカルトピックとの間でメッセージを発行およびサブスクライブするために使用 AWS IoT Device SDK する必要がある の最小バージョンを示します。
| SDK | 最小バージョン |
|---|---|
|
v1.2.10 |
|
|
v1.5.3 |
|
|
v1.17.0 |
|
|
v1.12.0 |
Authorization
ローカルのパブリッシュ/サブスクライブメッセージングをカスタムコンポーネントで使用するには、コンポーネントがトピックにメッセージを送受信できるようにする承認ポリシーを定義する必要があります。承認ポリシーの定義については、「コンポーネントに IPC オペレーションの実行を許可する」を参照してください。
パブリッシュ/サブスクライブメッセージングの承認ポリシーには以下のプロパティがあります。
IPC サービス識別子: aws.greengrass.ipc.pubsub
| 運用 | 説明 | リソース |
|---|---|---|
|
|
コンポーネントが指定したトピックにメッセージを発行できるようにします。 |
このトピック文字列は、MQTT トピックのワイルドカード ( |
|
|
コンポーネントが、指定したトピックに関するメッセージをサブスクライブできるようにします。 |
Greengrass nucleus v2.6.0 以降では、MQTT トピックワイルドカード ( |
|
|
コンポーネントが、指定したトピックのメッセージを発行およびサブスクライブできるようにします。 |
Greengrass nucleus v2.6.0 以降では、MQTT トピックワイルドカード ( |
承認ポリシーの例
次の承認ポリシーの例を参照して、コンポーネントの承認ポリシーの設定に役立てることができます。
例承認ポリシーの例
以下の承認ポリシーの例では、コンポーネントがすべてのトピックを公開およびサブスクライブすることを許可します。
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }
PublishToTopic
トピックへのメッセージの発行
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topic-
メッセージの発行先として指定するトピック。
publishMessage(Python:publish_message)-
発行するメッセージ。このオブジェクト (
PublishMessage) には、次の情報が含まれます。jsonMessageとbinaryMessageのどちらか 1 つを指定する必要があります。jsonMessage(Python:json_message)-
(オプション) JSON メッセージ。このオブジェクト (
JsonMessage) には、次の情報が含まれます。message-
オブジェクトとしての JSON メッセージ。
context-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
SDK 最小バージョン v1.9.3
v1.11.3
v1.18.4
v1.12.0
注記
AWS IoT Greengrass Core ソフトウェアは、
PublishToTopicおよびSubscribeToTopicオペレーションで同じメッセージオブジェクトを使用します。 AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージではこのコンテキストオブジェクトを無視します。このオブジェクト (
MessageContext) には、次の情報が含まれます。topic-
メッセージが発行されたトピック。
binaryMessage(Python:binary_message)-
(オプション) バイナリメッセージ。このオブジェクト (
BinaryMessage) には、次の情報が含まれます。message-
BLOB としてのバイナリメッセージ。
context-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
SDK 最小バージョン v1.9.3
v1.11.3
v1.18.4
v1.12.0
注記
AWS IoT Greengrass Core ソフトウェアは、
PublishToTopicおよびSubscribeToTopicオペレーションで同じメッセージオブジェクトを使用します。 AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージではこのコンテキストオブジェクトを無視します。このオブジェクト (
MessageContext) には、次の情報が含まれます。topic-
メッセージが発行されたトピック。
応答
このオペレーションはレスポンスで一切の情報を提供しません。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
SubscribeToTopic
トピックのメッセージをサブスクライブする。
このオペレーションはサブスクリプションオペレーションで、イベントメッセージのストリームをサブスクライブするというものです。このオペレーションを使用するには、イベントメッセージ、エラー、およびストリームクロージャを処理する関数を使用して、ストリームレスポンスハンドラーを定義します。(詳細については、IPC イベントストリームへのサブスクライブ を参照してください)。
イベントメッセージの種類: SubscriptionResponseMessage
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topic-
サブスクライブ先のトピック。
注記
Greengrass nucleus v2.6.0 以降では、このトピックは MQTT トピックのワイルドカード (
#と+) をサポートします。 receiveMode(Python:receive_mode)-
(オプション) コンポーネントが自身からのメッセージを受信するかどうかを指定する動作。この動作を変更して、コンポーネントが自身のメッセージに基づいて動作できるようにすることができます。デフォルトの動作は、トピックに MQTT ワイルドカードが含まれているかどうかによって異なります。次のオプションから選択します。
-
RECEIVE_ALL_MESSAGES– サブスクライブするコンポーネントからのメッセージを含む、トピックに一致するすべてのメッセージを受信します。このモードは、MQTT ワイルドカードを含まないトピックをサブスクライブする場合のデフォルトオプションです。
-
RECEIVE_MESSAGES_FROM_OTHERS– サブスクライブするコンポーネントからのメッセージを除き、トピックに一致するすべてのメッセージを受信します。このモードは、MQTT ワイルドカードを含むトピックをサブスクライブする場合のデフォルトオプションです。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表 AWS IoT Device SDK に、受信モードの設定に使用する必要がある の最小バージョンを示します。
SDK 最小バージョン v1.9.3
v1.11.3
v1.18.4
v1.12.0
-
応答
このオペレーションのレスポンスには以下の情報が含まれます。
messages-
メッセージのストリーム。このオブジェクト (
SubscriptionResponseMessage) には、次の情報が含まれます。各メッセージにはjsonMessageまたはbinaryMessageが含まれます。jsonMessage(Python:json_message)-
(オプション) JSON メッセージ。このオブジェクト (
JsonMessage) には、次の情報が含まれます。message-
オブジェクトとしての JSON メッセージ。
context-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
SDK 最小バージョン v1.9.3
v1.11.3
v1.18.4
v1.12.0
注記
AWS IoT Greengrass Core ソフトウェアは、
PublishToTopicおよびSubscribeToTopicオペレーションで同じメッセージオブジェクトを使用します。 AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージではこのコンテキストオブジェクトを無視します。このオブジェクト (
MessageContext) には、次の情報が含まれます。topic-
メッセージが発行されたトピック。
binaryMessage(Python:binary_message)-
(オプション) バイナリメッセージ。このオブジェクト (
BinaryMessage) には、次の情報が含まれます。message-
BLOB としてのバイナリメッセージ。
context-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
SDK 最小バージョン v1.9.3
v1.11.3
v1.18.4
v1.12.0
注記
AWS IoT Greengrass Core ソフトウェアは、
PublishToTopicおよびSubscribeToTopicオペレーションで同じメッセージオブジェクトを使用します。 AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージではこのコンテキストオブジェクトを無視します。このオブジェクト (
MessageContext) には、次の情報が含まれます。topic-
メッセージが発行されたトピック。
topicName(Python:topic_name)-
メッセージが発行されたトピック。
注記
このプロパティは現在使用されていません。Greengrass nucleus v2.6.0 以降では、
SubscriptionResponseMessageからの(jsonMessage|binaryMessage).context.topicの値を取得して、メッセージが発行されたトピックを取得できます。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
例
コンポーネントのパブリッシュ/サブスクライブ IPC サービスの使用方法については、以下の例を参照してください。
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
以下の Java アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
以下の Java アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントのメッセージをサブスクライブする方法を示します。
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
以下の Python アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
以下の Python アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントのメッセージをサブスクライブする方法を示します。
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
以下の C++ アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
以下の C++ アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントのメッセージをサブスクライブする方法を示します。
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherRust:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/publish_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherRust/1.0.0/publish_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }
次の Rust アプリケーション例は、パブリッシュ/サブスクライブ IPC サービスを使用して他のコンポーネントにメッセージを発行する方法を示しています。
use gg_sdk::Sdk; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let message = b"Hello, World"; let topic = "my/topic"; sdk.publish_to_topic_binary(topic, message) .expect("Failed to publish to topic"); println!("Successfully published to topic: {topic}"); }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberRust:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/subscribe_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberRust/1.0.0/subscribe_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }
次の Rust アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して他のコンポーネントからのメッセージをサブスクライブする方法を示しています。
use gg_sdk::{Sdk, SubscribeToTopicPayload}; use std::{thread, time::Duration}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let topic = "my/topic"; let callback = |topic: &str, payload: SubscribeToTopicPayload| match payload { SubscribeToTopicPayload::Binary(message) => { let message = String::from_utf8_lossy(message); println!("Received new message on topic {topic}: {message}"); } SubscribeToTopicPayload::Json(_) => { println!("Received new message on topic {topic}: (JSON message)"); } }; let _sub = sdk .subscribe_to_topic(topic, &callback) .expect("Failed to subscribe to topic"); println!("Successfully subscribed to topic: {topic}"); // Keep the main thread alive, or the process will exit. loop { thread::sleep(Duration::from_secs(10)); } }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherC:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_publish_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherC/1.0.0/sample_publish_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C アプリケーション例は、パブリッシュ/サブスクライブ IPC サービスを使用して他のコンポーネントにメッセージを発行する方法を示しています。
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <stdio.h> #include <stdlib.h> int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer message = GG_STR("Hello, World"); GgBuffer topic = GG_STR("my/topic"); err = ggipc_publish_to_topic_binary(topic, message); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to publish to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully published to topic: %.*s\n", (int) topic.len, topic.data ); }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberC:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_subscribe_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberC/1.0.0/sample_subscribe_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C アプリケーション例は、パブリッシュ/サブスクライブ IPC サービスを使用して他のコンポーネントからのメッセージをサブスクライブする方法を示しています。
#include <assert.h> #include <gg/error.h> #include <gg/ipc/client.h> #include <gg/object.h> #include <gg/sdk.h> #include <gg/types.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> static void on_subscription_response( void *ctx, GgBuffer topic, GgObject payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) handle; if (gg_obj_type(payload) == GG_TYPE_BUF) { GgBuffer message = gg_obj_into_buf(payload); printf( "Received new message on topic %.*s: %.*s\n", (int) topic.len, topic.data, (int) message.len, message.data ); } else { assert(gg_obj_type(payload) == GG_TYPE_MAP); printf( "Received new message on topic %.*s: (JSON message)\n", (int) topic.len, topic.data ); } } int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer topic = GG_STR("my/topic"); GgIpcSubscriptionHandle handle; err = ggipc_subscribe_to_topic( topic, on_subscription_response, NULL, &handle ); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to subscribe to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data ); // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } // To stop subscribing, close the stream. ggipc_close_subscription(handle); }
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_publish_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/sample_cpp_publish_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }
以下の C++ アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
#include <gg/ipc/client.hpp> #include <iostream> int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view message = "Hello, World"; std::string_view topic = "my/topic"; error = client.publish_to_topic(topic, message); if (error) { std::cerr << "Failed to publish to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully published to topic: " << topic << "\n"; }
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_subscribe_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/sample_cpp_subscribe_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }
次の C++ アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して他のコンポーネントからのメッセージをサブスクライブする方法を示しています。
#include <gg/ipc/client.hpp> #include <gg/object.hpp> #include <unistd.h> #include <cassert> #include <iostream> class ResponseHandler : public gg::ipc::LocalTopicCallback { void operator()( std::string_view topic, gg::Object payload, gg::ipc::Subscription &handle ) override { (void) handle; if (payload.index() == GG_TYPE_BUF) { std::cout << "Received new message on topic " << topic << ": " << get<gg::Buffer>(payload) << "\n"; } else { assert(payload.index() == GG_TYPE_MAP); std::cout << "Received new message on topic " << topic << ": (JSON message)\n"; } } }; int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view topic = "my/topic"; static ResponseHandler handler; error = client.subscribe_to_topic(topic, handler); if (error) { std::cerr << "Failed to subscribe to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully subscribed to topic: " << topic << "\n"; // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } }