Publicar/assinar mensagens locais - AWS IoT Greengrass

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Publicar/assinar mensagens locais

O sistema de mensagens de publicação/assinatura (pubsub) permitem que você envie e receba mensagens em tópicos. Os componentes podem publicar mensagens em tópicos para enviar mensagens para outros componentes. Em seguida, os componentes inscritos nesse tópico podem agir nas mensagens que recebem.

nota

Você não pode usar esse serviço de publish/subscribe IPC para publicar ou assinar o AWS IoT Core MQTT. Para obter mais informações sobre como trocar mensagens com o AWS IoT Core MQTT, consultePublique/assine mensagens MQTT AWS IoT Core.

Versões mínimas do SDK

A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para publicar e assinar mensagens de e para tópicos locais.

Autorização

Para usar publish/subscribe mensagens locais em um componente personalizado, você deve definir políticas de autorização que permitam que seu componente envie e receba mensagens para tópicos. Para obter informações sobre a definição de políticas de autorização, consulte Autorizar componentes a realizar operações de IPC.

As políticas de autorização para publish/subscribe mensagens têm as seguintes propriedades.

Identificador de serviço IPC: aws.greengrass.ipc.pubsub

Operation Description Recursos

aws.greengrass#PublishToTopic

Permite que um componente publique mensagens nos tópicos que você especificar.

Uma string de tópicos, como test/topic. Use um * para corresponder a qualquer combinação de caracteres em um tópico.

Essa string de tópico não aceita caracteres curinga de tópico do MQTT (# e +).

aws.greengrass#SubscribeToTopic

Permite que um componente publique mensagens nos tópicos do MQTT que você especificar.

Uma string de tópicos, como test/topic. Use um * para corresponder a qualquer combinação de caracteres em um tópico.

No núcleo do Greengrass v2.6.0 e versões posteriores, você pode se inscrever em tópicos que contenham curingas de tópicos do MQTT (# e +). Essa string de tópico aceita curingas de tópico MQTT como caracteres literais. Por exemplo, se a política de autorização de um componente conceder acesso a test/topic/#, o componente pode se inscrever no test/topic/#, mas não pode se inscrever no test/topic/filter.

*

Permite que um componente publique e assine mensagens para os tópicos que você especificar.

Uma string de tópicos, como test/topic. Use um * para corresponder a qualquer combinação de caracteres em um tópico.

No núcleo do Greengrass v2.6.0 e versões posteriores, você pode se inscrever em tópicos que contenham curingas de tópicos do MQTT (# e +). Essa string de tópico aceita curingas de tópico MQTT como caracteres literais. Por exemplo, se a política de autorização de um componente conceder acesso a test/topic/#, o componente pode se inscrever no test/topic/#, mas não pode se inscrever no test/topic/filter.

Exemplos de política de autorização

Consulte o exemplo de política de autorização a seguir para configurar políticas de autorização para seus componentes.

exemplo Exemplo de política de autorização

O exemplo de política de autorização a seguir permite que um componente publique e assine em todos os tópicos.

{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }

PublishToTopic

Publique uma mensagem em um tópico.

Solicitação

A solicitação dessa operação tem os seguintes parâmetros:

topic

O tópico no qual publicar a mensagem.

publishMessage (Python: publish_message)

A mensagem a ser publicada. Esse objeto, PublishMessage, contém as informações a seguir. Você deve especificar jsonMessage ou binaryMessage.

jsonMessage (Python: json_message)

(Opcional) Uma mensagem JSON. Esse objeto, JsonMessage, contém as seguintes informações:

message

A mensagem JSON como um objeto.

context

O contexto da mensagem, como o tópico em que a mensagem foi publicada.

Esse recurso está disponível para a versão 2.6.0 e posterior do componente de núcleo do Greengrass. A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para acessar o contexto da mensagem.

nota

O software AWS IoT Greengrass Core usa os mesmos objetos de mensagem nas SubscribeToTopic operações PublishToTopic e. O software AWS IoT Greengrass Core define esse objeto de contexto nas mensagens quando você se inscreve e ignora esse objeto de contexto nas mensagens que você publica.

Esse objeto, MessageContext, contém as seguintes informações:

topic

O tópico em que a mensagem foi publicada.

binaryMessage (Python: binary_message)

(Opcional) Uma mensagem binária. Esse objeto, BinaryMessage, contém as seguintes informações:

message

A mensagem binária como uma bolha.

context

O contexto da mensagem, como o tópico em que a mensagem foi publicada.

Esse recurso está disponível para a versão 2.6.0 e posterior do componente de núcleo do Greengrass. A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para acessar o contexto da mensagem.

nota

O software AWS IoT Greengrass Core usa os mesmos objetos de mensagem nas SubscribeToTopic operações PublishToTopic e. O software AWS IoT Greengrass Core define esse objeto de contexto nas mensagens quando você se inscreve e ignora esse objeto de contexto nas mensagens que você publica.

Esse objeto, MessageContext, contém as seguintes informações:

topic

O tópico em que a mensagem foi publicada.

Resposta

Essa operação não fornece nenhuma informação em sua resposta.

Exemplos

Os exemplos a seguir demonstram como chamar essa operação no código do componente personalizado.

Java (IPC client V2)
exemplo Exemplo: publicar uma mensagem binária
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.BinaryMessage; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import java.nio.charset.StandardCharsets; public class PublishToTopicV2 { public static void main(String[] args) { String topic = args[0]; String message = args[1]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static PublishToTopicResponse publishBinaryMessageToTopic( GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException { BinaryMessage binaryMessage = new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8)); PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage); PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage); return ipcClient.publishToTopic(publishToTopicRequest); } }
Python (IPC client V2)
exemplo Exemplo: publicar uma mensagem binária
import sys import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, BinaryMessage ) def main(): args = sys.argv[1:] topic = args[0] message = args[1] try: ipc_client = GreengrassCoreIPCClientV2() publish_binary_message_to_topic(ipc_client, topic, message) print('Successfully published to topic: ' + topic) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def publish_binary_message_to_topic(ipc_client, topic, message): binary_message = BinaryMessage(message=bytes(message, 'utf-8')) publish_message = PublishMessage(binary_message=binary_message) return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message) if __name__ == '__main__': main()
C++ (IPC client V1)
exemplo Exemplo: publicar uma mensagem binária
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); String message("Hello, World!"); int timeout = 10; PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
exemplo Exemplo: publicar uma mensagem binária
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; private readonly messageString : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.messageString = "<define_your_message_string>"; this.publishToTopic().then(r => console.log("Started workflow")); } private async publishToTopic() { try { this.ipcClient = await getIpcClient(); const binaryMessage : BinaryMessage = { message: this.messageString } const publishMessage : PublishMessage = { binaryMessage: binaryMessage } const request : PublishToTopicRequest = { topic: this.topic, publishMessage: publishMessage } this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToTopic = new PublishToTopic();
Rust
exemplo Exemplo: publicar uma mensagem binária
use gg_sdk::Sdk; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let message = b"Hello, World"; let topic = "my/topic"; sdk.publish_to_topic_binary(topic, message) .expect("Failed to publish to topic"); println!("Successfully published to topic: {topic}"); }
C
exemplo Exemplo: publicar uma mensagem binária
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <stdio.h> #include <stdlib.h> int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer message = GG_STR("Hello, World"); GgBuffer topic = GG_STR("my/topic"); err = ggipc_publish_to_topic_binary(topic, message); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to publish to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully published to topic: %.*s\n", (int) topic.len, topic.data ); }
C++ (Component SDK)
exemplo Exemplo: publicar uma mensagem binária
#include <gg/ipc/client.hpp> #include <iostream> int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view message = "Hello, World"; std::string_view topic = "my/topic"; error = client.publish_to_topic(topic, message); if (error) { std::cerr << "Failed to publish to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully published to topic: " << topic << "\n"; }

SubscribeToTopic

Assine mensagens sobre um tópico.

Essa operação é uma operação de assinatura em que você assina um fluxo de mensagens de eventos. Para usar essa operação, defina um manipulador de resposta de fluxo com funções que manipulam mensagens de eventos, erros e encerramento de fluxo. Para obter mais informações, consulte Inscrever-se nos fluxos de eventos da IPC.

Tipo de mensagem do evento: SubscriptionResponseMessage

Solicitação

A solicitação dessa operação tem os seguintes parâmetros:

topic

O tópico a ser assinado.

nota

No Greengrass nucleus v2.6.0 e versões posteriores, este tópico oferece suporte aos curingas do tópico MQTT (e). # +

receiveMode (Python: receive_mode)

(Opcional) O comportamento que especifica se o componente recebe mensagens de si mesmo. Você pode alterar esse comportamento para permitir que um componente atue em suas próprias mensagens. O comportamento padrão depende se o tópico contém um curinga MQTT. Escolha uma das seguintes opções:

  • RECEIVE_ALL_MESSAGES: receba todas as mensagens que correspondam ao tópico, incluindo mensagens do componente que se inscreve.

    Esse modo é a opção padrão quando você se inscreve em um tópico que não contém um curinga MQTT.

  • RECEIVE_MESSAGES_FROM_OTHERS: receba todas as mensagens que correspondam ao tópico, exceto as mensagens do componente que se inscreve.

    Esse modo é a opção padrão quando você se inscreve em um tópico que contém um curinga MQTT.

Esse recurso está disponível para a versão 2.6.0 e posterior do componente de núcleo do Greengrass. A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para definir o modo de recepção.

Resposta

A resposta dessa operação tem as seguintes informações:

messages

O fluxo de mensagens. Esse objeto, SubscriptionResponseMessage, contém as informações a seguir. Cada mensagem contém jsonMessage oubinaryMessage.

jsonMessage (Python: json_message)

(Opcional) Uma mensagem JSON. Esse objeto, JsonMessage, contém as seguintes informações:

message

A mensagem JSON como um objeto.

context

O contexto da mensagem, como o tópico em que a mensagem foi publicada.

Esse recurso está disponível para a versão 2.6.0 e posterior do componente de núcleo do Greengrass. A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para acessar o contexto da mensagem.

nota

O software AWS IoT Greengrass Core usa os mesmos objetos de mensagem nas SubscribeToTopic operações PublishToTopic e. O software AWS IoT Greengrass Core define esse objeto de contexto nas mensagens quando você se inscreve e ignora esse objeto de contexto nas mensagens que você publica.

Esse objeto, MessageContext, contém as seguintes informações:

topic

O tópico em que a mensagem foi publicada.

binaryMessage (Python: binary_message)

(Opcional) Uma mensagem binária. Esse objeto, BinaryMessage, contém as seguintes informações:

message

A mensagem binária como uma bolha.

context

O contexto da mensagem, como o tópico em que a mensagem foi publicada.

Esse recurso está disponível para a versão 2.6.0 e posterior do componente de núcleo do Greengrass. A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para acessar o contexto da mensagem.

nota

O software AWS IoT Greengrass Core usa os mesmos objetos de mensagem nas SubscribeToTopic operações PublishToTopic e. O software AWS IoT Greengrass Core define esse objeto de contexto nas mensagens quando você se inscreve e ignora esse objeto de contexto nas mensagens que você publica.

Esse objeto, MessageContext, contém as seguintes informações:

topic

O tópico em que a mensagem foi publicada.

topicName (Python: topic_name)

O tópico em que a mensagem foi publicada.

nota

No momento, essa propriedade não é usada. No Greengrass nucleus v2.6.0 e versões posteriores, você pode obter o (jsonMessage|binaryMessage).context.topic valor de a para obter o tópico em SubscriptionResponseMessage que a mensagem foi publicada.

Exemplos

Os exemplos a seguir demonstram como chamar essa operação no código do componente personalizado.

Java (IPC client V2)
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++ (IPC client V1)
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();
Rust
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
use gg_sdk::{Sdk, SubscribeToTopicPayload}; use std::{thread, time::Duration}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let topic = "my/topic"; let callback = |topic: &str, payload: SubscribeToTopicPayload| match payload { SubscribeToTopicPayload::Binary(message) => { let message = String::from_utf8_lossy(message); println!("Received new message on topic {topic}: {message}"); } SubscribeToTopicPayload::Json(_) => { println!("Received new message on topic {topic}: (JSON message)"); } }; let _sub = sdk .subscribe_to_topic(topic, &callback) .expect("Failed to subscribe to topic"); println!("Successfully subscribed to topic: {topic}"); // Keep the main thread alive, or the process will exit. loop { thread::sleep(Duration::from_secs(10)); } }
C
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
#include <assert.h> #include <gg/error.h> #include <gg/ipc/client.h> #include <gg/object.h> #include <gg/sdk.h> #include <gg/types.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> static void on_subscription_response( void *ctx, GgBuffer topic, GgObject payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) handle; if (gg_obj_type(payload) == GG_TYPE_BUF) { GgBuffer message = gg_obj_into_buf(payload); printf( "Received new message on topic %.*s: %.*s\n", (int) topic.len, topic.data, (int) message.len, message.data ); } else { assert(gg_obj_type(payload) == GG_TYPE_MAP); printf( "Received new message on topic %.*s: (JSON message)\n", (int) topic.len, topic.data ); } } int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer topic = GG_STR("my/topic"); GgIpcSubscriptionHandle handle; err = ggipc_subscribe_to_topic( topic, on_subscription_response, NULL, &handle ); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to subscribe to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data ); // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } // To stop subscribing, close the stream. ggipc_close_subscription(handle); }
C++ (Component SDK)
exemplo Exemplo: Inscrever-se em publish/subscribe mensagens locais
#include <gg/ipc/client.hpp> #include <gg/object.hpp> #include <unistd.h> #include <cassert> #include <iostream> class ResponseHandler : public gg::ipc::LocalTopicCallback { void operator()( std::string_view topic, gg::Object payload, gg::ipc::Subscription &handle ) override { (void) handle; if (payload.index() == GG_TYPE_BUF) { std::cout << "Received new message on topic " << topic << ": " << get<gg::Buffer>(payload) << "\n"; } else { assert(payload.index() == GG_TYPE_MAP); std::cout << "Received new message on topic " << topic << ": (JSON message)\n"; } } }; int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view topic = "my/topic"; static ResponseHandler handler; error = client.subscribe_to_topic(topic, handler); if (error) { std::cerr << "Failed to subscribe to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully subscribed to topic: " << topic << "\n"; // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } }

Exemplos

Use os exemplos a seguir para aprender a usar o serviço publish/subscribe IPC em seus componentes.

O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubPublisher.jar

O exemplo de aplicação Java a seguir demonstra como usar o serviço IPC de publicação/assinatura para publicar mensagens em outros componentes.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubSubscriber.jar

O exemplo de aplicação Java a seguir demonstra como usar o serviço IPC de publicação/assinatura para assinar mensagens em outros componentes.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_publisher.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_publisher.py

O exemplo de aplicação Python a seguir demonstra como usar o serviço IPC de publicação/assinatura para publicar mensagens em outros componentes.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_subscriber.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_subscriber.py

O exemplo de aplicação Python a seguir demonstra como usar o serviço IPC de publicação/assinatura para assinar mensagens em outros componentes.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherCpp:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pubsub_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher Permission: Execute: OWNER

O exemplo de aplicação C++ a seguir demonstra como usar o serviço IPC de publicação/assinatura para publicar mensagens em outros componentes.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberCpp:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pub_sub_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber Permission: Execute: OWNER

O exemplo de aplicação C++ a seguir demonstra como usar o serviço IPC de publicação/assinatura para assinar mensagens em outros componentes.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }

O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.

{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherRust:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/publish_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherRust/1.0.0/publish_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }

O exemplo a seguir do aplicativo Rust demonstra como usar o serviço IPC de publicação/assinatura para publicar mensagens em outros componentes.

use gg_sdk::Sdk; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let message = b"Hello, World"; let topic = "my/topic"; sdk.publish_to_topic_binary(topic, message) .expect("Failed to publish to topic"); println!("Successfully published to topic: {topic}"); }

O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.

{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberRust:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/subscribe_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberRust/1.0.0/subscribe_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }

O exemplo a seguir do aplicativo Rust demonstra como usar o serviço IPC de publicação/assinatura para assinar mensagens de outros componentes.

use gg_sdk::{Sdk, SubscribeToTopicPayload}; use std::{thread, time::Duration}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let topic = "my/topic"; let callback = |topic: &str, payload: SubscribeToTopicPayload| match payload { SubscribeToTopicPayload::Binary(message) => { let message = String::from_utf8_lossy(message); println!("Received new message on topic {topic}: {message}"); } SubscribeToTopicPayload::Json(_) => { println!("Received new message on topic {topic}: (JSON message)"); } }; let _sub = sdk .subscribe_to_topic(topic, &callback) .expect("Failed to subscribe to topic"); println!("Successfully subscribed to topic: {topic}"); // Keep the main thread alive, or the process will exit. loop { thread::sleep(Duration::from_secs(10)); } }

O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.

{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherC:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_publish_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherC/1.0.0/sample_publish_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }

O exemplo de aplicativo C a seguir demonstra como usar o serviço IPC de publicação/assinatura para publicar mensagens em outros componentes.

#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <stdio.h> #include <stdlib.h> int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer message = GG_STR("Hello, World"); GgBuffer topic = GG_STR("my/topic"); err = ggipc_publish_to_topic_binary(topic, message); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to publish to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully published to topic: %.*s\n", (int) topic.len, topic.data ); }

O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.

{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberC:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_subscribe_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberC/1.0.0/sample_subscribe_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }

O exemplo de aplicativo C a seguir demonstra como usar o serviço IPC de publicação/assinatura para assinar mensagens de outros componentes.

#include <assert.h> #include <gg/error.h> #include <gg/ipc/client.h> #include <gg/object.h> #include <gg/sdk.h> #include <gg/types.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> static void on_subscription_response( void *ctx, GgBuffer topic, GgObject payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) handle; if (gg_obj_type(payload) == GG_TYPE_BUF) { GgBuffer message = gg_obj_into_buf(payload); printf( "Received new message on topic %.*s: %.*s\n", (int) topic.len, topic.data, (int) message.len, message.data ); } else { assert(gg_obj_type(payload) == GG_TYPE_MAP); printf( "Received new message on topic %.*s: (JSON message)\n", (int) topic.len, topic.data ); } } int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer topic = GG_STR("my/topic"); GgIpcSubscriptionHandle handle; err = ggipc_subscribe_to_topic( topic, on_subscription_response, NULL, &handle ); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to subscribe to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data ); // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } // To stop subscribing, close the stream. ggipc_close_subscription(handle); }

O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.

{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_publish_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/sample_cpp_publish_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }

O exemplo de aplicação C++ a seguir demonstra como usar o serviço IPC de publicação/assinatura para publicar mensagens em outros componentes.

#include <gg/ipc/client.hpp> #include <iostream> int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view message = "Hello, World"; std::string_view topic = "my/topic"; error = client.publish_to_topic(topic, message); if (error) { std::cerr << "Failed to publish to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully published to topic: " << topic << "\n"; }

O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.

{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_subscribe_to_topic" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/sample_cpp_subscribe_to_topic", "Permission": { "Execute": "OWNER" } } ] } ] }

O exemplo de aplicativo C++ a seguir demonstra como usar o serviço IPC de publicação/assinatura para assinar mensagens de outros componentes.

#include <gg/ipc/client.hpp> #include <gg/object.hpp> #include <unistd.h> #include <cassert> #include <iostream> class ResponseHandler : public gg::ipc::LocalTopicCallback { void operator()( std::string_view topic, gg::Object payload, gg::ipc::Subscription &handle ) override { (void) handle; if (payload.index() == GG_TYPE_BUF) { std::cout << "Received new message on topic " << topic << ": " << get<gg::Buffer>(payload) << "\n"; } else { assert(payload.index() == GG_TYPE_MAP); std::cout << "Received new message on topic " << topic << ": (JSON message)\n"; } } }; int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view topic = "my/topic"; static ResponseHandler handler; error = client.subscribe_to_topic(topic, handler); if (error) { std::cerr << "Failed to subscribe to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully subscribed to topic: " << topic << "\n"; // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } }