

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Python Pika 与 Amazon MQ for RabbitMQ 结合使用
<a name="amazon-mq-rabbitmq-pika"></a>

 以下教程说明如何使用 TLS 设置 [Python Pika](https://github.com/pika/pika)，并将 TLS 配置为连接到 Amazon MQ for RabbitMQ 代理。Pika 是适用于 RabbitMQ 的 AMQP 0-9-1 协议的 Python 实现。本教程将指导您安装 Pika、声明队列、设置发布者以向代理的默认交易所发送消息，以及设置使用者以接收来自队列的消息。

**Topics**
+ [先决条件](#amazon-mq-rabbitmq-pika-prerequisites)
+ [Permissions](#amazon-mq-rabbitmq-pika-permissions)
+ [步骤一：创建基本的 Python Pika 客户端](#amazon-mq-rabbitmq-pika-basic-client)
+ [步骤二：创建发布者并发送消息](#amazon-mq-rabbitmq-pika-publisher-basic-publish)
+ [第三步：创建消费者并接收消息](#amazon-mq-rabbitmq-pika-consumer-basic-get)
+ [步骤四：（可选）设置事件循环并使用消息](#amazon-mq-rabbitmq-pika-consumer-basic-consume)
+ [接下来做什么？](#amazon-mq-rabbitmq-pika-whats-next)

## 先决条件
<a name="amazon-mq-rabbitmq-pika-prerequisites"></a>

 要完成本教程中的步骤，您需要满足以下先决条件：
+ Amazon MQ for RabbitMQ 代理。有关更多信息，请参阅[创建 Amazon MQ for RabbitMQ 代理](getting-started-rabbitmq.md#create-rabbitmq-broker)。
+ 安装适用于操作系统的 [Python 3](https://www.python.org/downloads/)。
+ 使用 Python `pip` 安装 [Pika](https://pika.readthedocs.io/en/stable/)。要安装 Pika，请打开新的终端窗口并运行以下内容。

  ```
  $ python3 -m pip install pika
  ```

## Permissions
<a name="amazon-mq-rabbitmq-pika-permissions"></a>

在本教程中，您需要至少一个 Amazon MQ for RabbitMQ 代理用户，该用户具有写入和读取虚拟主机的权限。下表将必要的最低权限描述为正则表达式 (regexp) 模式。


| 标签 | 配置正则表达式 | 写入正则表达式 | 读取正则表达式 | 
| --- | --- | --- | --- | 
| none |  | .\$1 | .\$1 | 

 列出的用户权限仅向用户提供读写权限，而不授予在代理上执行管理操作的管理插件访问权限。您可以通过提供限制用户访问指定队列的 regexp 模式来进一步限制权限。例如，如果将读取 regexp 模式更改为 `^[hello world].*`，用户只拥有从以 `hello world` 开头的队列中读取的权限。

有关创建 RabbitMQ 用户和管理用户标记和权限的更多信息，请参阅 [Amazon MQ for RabbitMQ 代理用户](rabbitmq-simple-auth-broker-users.md#rabbitmq-basic-elements-user)。

## 步骤一：创建基本的 Python Pika 客户端
<a name="amazon-mq-rabbitmq-pika-basic-client"></a>

请执行以下操作来创建 Python Pika 客户端基类，该基类定义构造函数，并在与 Amazon MQ for RabbitMQ 代理交互时提供 TLS 配置所需的 SSL 上下文。

1.  打开新的终端窗口，为项目创建新目录，然后导航到该目录。

   ```
   $ mkdir pika-tutorial
   $ cd pika-tutorial
   ```

1.  创建一个名为 `basicClient.py` 的文件，其中包含以下 Python 代码。

   ```
   import ssl
   import pika
   
   class BasicPikaClient:
   
       def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):
   
           # SSL Context for TLS configuration of Amazon MQ for RabbitMQ
           ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
           ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')
   
           url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
           parameters = pika.URLParameters(url)
           parameters.ssl_options = pika.SSLOptions(context=ssl_context)
   
           self.connection = pika.BlockingConnection(parameters)
           self.channel = self.connection.channel()
   ```

 现在，您可以为继承自 `BasicPikaClient` 的发布者和使用者定义其他类。

## 步骤二：创建发布者并发送消息
<a name="amazon-mq-rabbitmq-pika-publisher-basic-publish"></a>

 要创建声明队列的发布者并发送单条消息，请执行以下操作。

1.  复制以下代码示例的内容，并将其作为 `publisher.py` 保存在本地上一个步骤创建的相同目录中。

   ```
   from basicClient import BasicPikaClient
   
   class BasicMessageSender(BasicPikaClient):
   
       def declare_queue(self, queue_name):
           print(f"Trying to declare queue({queue_name})...")
           self.channel.queue_declare(queue=queue_name)
   
       def send_message(self, exchange, routing_key, body):
           channel = self.connection.channel()
           channel.basic_publish(exchange=exchange,
                                 routing_key=routing_key,
                                 body=body)
           print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}")
   
       def close(self):
           self.channel.close()
           self.connection.close()
   
   if __name__ == "__main__":
   
       # Initialize Basic Message Sender which creates a connection
       # and channel for sending messages.
       basic_message_sender = BasicMessageSender(
           "<broker-id>",
           "<username>",
           "<password>",
           "<region>"
       )
   
       # Declare a queue
       basic_message_sender.declare_queue("hello world queue")
   
       # Send a message to the queue.
       basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!')
   
       # Close connections.
       basic_message_sender.close()
   ```

    `BasicMessageSender` 类继承自 `BasicPikaClient` 并实施了用于声明队列、向队列发送消息和关闭连接的其他方法。代码示例使用等于队列名称的路由密钥，将消息路由到默认交换器。

1.  在 `if __name__ == "__main__":` 下，将传递给 `BasicMessageSender` 构造函数语句的参数替换为以下信息。
   +  **`<broker-id>`** – Amazon MQ 为代理生成的唯一 ID。您可以通过代理 ARN 解析 ID。例如，给定以下 ARN `arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9`，代理 ID 将为 `b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9`。
   +  **`<username>`** - 具有足够权限向代理写入消息的代理用户的用户名。
   +  **`<password>`** - 具有足够权限向代理写入消息的代理用户的密码。
   +  **`<region>`**— 您创建适用于 RabbitMQ 的亚马逊 MQ 经纪商所在的 AWS 区域。例如 `us-west-2`。

1.  在创建 `publisher.py` 的同一目录中运行以下命令。

   ```
   $ python3 publisher.py
   ```

   如果代码运行成功，您将在终端窗口中看到以下输出。

   ```
   Trying to declare queue(hello world queue)...
   Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'
   ```

## 第三步：创建消费者并接收消息
<a name="amazon-mq-rabbitmq-pika-consumer-basic-get"></a>

 要创建从队列中接收单条消息的使用者，请执行以下操作。

1.  复制以下代码示例的内容，并将其作为 `consumer.py` 保存在本地同一目录中。

   ```
   from basicClient import BasicPikaClient
   
   class BasicMessageReceiver(BasicPikaClient):
   
       def get_message(self, queue):
           method_frame, header_frame, body = self.channel.basic_get(queue)
           if method_frame:
               print(method_frame, header_frame, body)
               self.channel.basic_ack(method_frame.delivery_tag)
               return method_frame, header_frame, body
           else:
               print('No message returned')
   
       def close(self):
           self.channel.close()
           self.connection.close()
   
   
   if __name__ == "__main__":
   
       # Create Basic Message Receiver which creates a connection
       # and channel for consuming messages.
       basic_message_receiver = BasicMessageReceiver(
           "<broker-id>",
           "<username>",
           "<password>",
           "<region>"
       )
   
       # Consume the message that was sent.
       basic_message_receiver.get_message("hello world queue")
   
       # Close connections.
       basic_message_receiver.close()
   ```

    与您在上一步中创建的发布者类似，`BasicMessageReceiver`它继承`BasicPikaClient`并实现了用于接收单条消息和关闭连接的其他方法。

1.  在 `if __name__ == "__main__":` 语句下，将传递给 `BasicMessageReceiver` 构造函数的参数替换为您的信息。

1.  在您的项目目录中运行以下命令。

   ```
   $ python3 consumer.py
   ```

   如果代码运行成功，您将在终端窗口中看到消息正文以及包括路由密钥的标头。

   ```
   <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'
   ```

## 步骤四：（可选）设置事件循环并使用消息
<a name="amazon-mq-rabbitmq-pika-consumer-basic-consume"></a>

 要使用队列中的多条消息，请使用 Pika 的 [https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_consume](https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_consume) 方法和回调函数，如下所示 

1.  在 `consumer.py` 中，将以下方法定义添加到 `BasicMessageReceiver` 类。

   ```
   def consume_messages(self, queue):
       def callback(ch, method, properties, body):
           print(" [x] Received %r" % body)
   
       self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
   
       print(' [*] Waiting for messages. To exit press CTRL+C')
       self.channel.start_consuming()
   ```

1.  在 `consumer.py` 中的 `if __name__ == "__main__":` 下，调用您在上一个步骤中定义的 `consume_messages` 方法。

   ```
   if __name__ == "__main__":
   
       # Create Basic Message Receiver which creates a connection and channel for consuming messages.
       basic_message_receiver = BasicMessageReceiver(
           "<broker-id>",
           "<username>",
           "<password>",
           "<region>"
       )
   
       # Consume the message that was sent.
       # basic_message_receiver.get_message("hello world queue")
   
       # Consume multiple messages in an event loop.
       basic_message_receiver.consume_messages("hello world queue")
   
       # Close connections.
       basic_message_receiver.close()
   ```

1.  再次运行 `consumer.py`，如果运行成功，队列消息将显示在终端窗口中。

   ```
   [*] Waiting for messages. To exit press CTRL+C
   [x] Received b'Hello World!'
   [x] Received b'Hello World!'
   ...
   ```

## 接下来做什么？
<a name="amazon-mq-rabbitmq-pika-whats-next"></a>
+  有关其他支持的 RabbitMQ 客户端库的更多信息，请参阅 RabbitMQ 网站上的 [RabbitMQ 客户端文档](https://www.rabbitmq.com/clients.html)。