

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 事务发件箱模式
<a name="transactional-outbox"></a>

## 意图
<a name="transactional-outbox-intent"></a>

事务发件箱模式解决了分布式系统中的双重写入操作问题，此问题出现于单个操作同时涉及数据库写入操作和消息或事件通知时。当应用程序向两个不同的系统写入数据时，便会发生双重写入操作；例如，当微服务需要在数据库中持久化数据并发送消息以通知其他系统时。其中一个操作失败便可能会导致数据不一致。

## 动机
<a name="transactional-outbox-motivation"></a>

当微服务在数据库更新后发送事件通知时，这两个操作应以原子方式运行，从而确保数据一致性和可靠性。
+ 如果数据库更新成功但事件通知失败，则下游服务将不知道有发生更改，系统可能会进入不一致的状态。
+ 如果数据库更新失败但发送了事件通知，则数据可能会损坏，由此可能会影响系统的可靠性。

## 适用性
<a name="transactional-outbox-applicability"></a>

在以下情况使用事务发件箱模式：
+ 您正在构建事件驱动的应用程序，其中的数据库更新会启动事件通知。
+ 您需要确保涉及两项服务的操作的原子性。
+ 您想实现[事件溯源模式](event-sourcing.md)。

## 问题和注意事项
<a name="transactional-outbox-issues"></a>
+ **重复消息**：事件处理服务可能会发送重复的消息或事件，因此建议您通过跟踪已处理的消息来令服务的使用具有幂等性。
+ **通知顺序**：按照服务更新数据库的同一顺序发送消息或事件。这对于事件源模式至关重要，在这种模式中，您可以使用事件存储来 point-in-time恢复数据存储。如果顺序不正确，则可能会影响数据的质量。如果未持久化通知顺序，则最终一致性和数据库回滚可能会将问题复杂化。
+ **事务回滚**：如果事务已回滚，请勿发送事件通知。
+ **服务级事务处理**：如果事务跨越需要数据存储更新的服务，则请使用 [saga 编排模式](saga-orchestration.md)来保持数据存储中的数据完整性。

## 实施
<a name="transactional-outbox-implementation"></a>

### 高级架构
<a name="transactional-implementation-high-level-arch"></a>

以下序列图显示了双重写入操作期间发生的事件顺序。

![\[双重写入操作期间的事件顺序\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-1.png)


1. 航班服务写入数据库，并向付款服务发送出事件通知。

1. 消息代理将消息和事件传送至付款服务。消息代理中的任何故障都会导致付款服务无法接收更新。

如果航班数据库更新失败但通知已发出，则付款服务将根据事件通知处理付款。此操作将导致下游数据不一致。

### 使用 AWS 服务实施
<a name="transactional-implementation-aws-services"></a>

为了演示序列图中的模式，我们将使用以下 AWS 服务，如下图所示。
+ 微服务是通过使用 [AWS Lambda](https://aws.amazon.com/lambda/) 实现的。
+ 主数据库由 [Amazon Relational Database Service（Amazon RDS）](https://aws.amazon.com/rds/)管理。
+ [Amazon Simple Queue Service（Amazon SQS）](https://aws.amazon.com/sqs/)充当接收事件通知的消息代理。

![\[使用 Amazon RDS 和 AWS Lambda Amazon SQS 的交易发件箱模式\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-2.png)


如果航班服务在提交事务后出现故障，则可能导致无法发送事件通知。

![\[提交操作后的事务失败\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-3.png)


但是，事务可能会失败并回滚，但事件通知可能仍会发送，从而导致付款服务处理付款。

![\[提交操作后事务失败并回滚\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-4.png)


要解决此问题，您可以使用发件箱表或更改数据捕获（CDC）。以下各部分将讨论这两个选项，以及如何使用亚马逊云科技服务实现它们。

#### 将发件箱表与关系数据库配合使用
<a name="transactional-implementation-rdb"></a>

发件箱表存储来自航班服务的所有事件，带有时间戳和序列号。

当航班表更新时，发件箱表也会在同一事务中更新。另一项服务（例如事件处理服务）从发件箱表中读取信息并将事件发送到 Amazon SQS。Amazon SQS 会向付款服务发送有关该事件的消息以供进一步处理。[Amazon SQS 标准队列](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html)可保证消息至少传送一次，且不会丢失。但是，当您使用 Amazon SQS 标准队列时，同一条消息或事件可能会多次传送，因此您应确保事件通知服务是幂等性的（也就是说，多次处理同一条消息不会产生不利影响）。如果您要求消息只处理一次，并采用消息排序，则可以使用 [Amazon SQS 先入先出 (FIFO](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html)) 队列。

如果航班表更新失败，或发件箱表更新失败，则会回滚整个事务，因此不会出现下游数据不一致的情况。

![\[无下游数据不一致的回滚\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-5.png)


在下图中，事务发件箱架构是使用 Amazon RDS 数据库实现的。当事件处理服务读取发件箱表时，它只识别已提交（成功）事务中的那些行，然后将事件的消息放入 SQS 队列中，由付款服务读取该队列以供进一步处理。这种设计解决了双重写入操作问题，并通过使用时间戳和序列号来持久化消息和事件的顺序。

![\[解决双重写入操作问题的设计\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-6.png)


#### 使用更改数据捕获（CDC）
<a name="transactional-implementation-cdc"></a>

某些数据库支持发布项目级修改，以捕获已更改的数据。您可以识别已更改的项目，并相应地发送事件通知。这样可以节省创建用于跟踪更新的另一个表的开销。航班服务发起的事件存储在同一项目的另一个属性中。

[Amazon DynamoDB](https://aws.amazon.com/dynamodb/) 是一个键/值 NoSQL 数据库，支持 CDC 更新。在下面的序列图中，DynamoDB 发布了对 Amazon DynamoDB Streams 的项目级修改。事件处理服务从流中读取数据，并将事件通知发布到付款服务以供进一步处理。

![\[带有 DynamoDB 和 DynamoDB Streams 的事务发件箱\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-7.png)


DynamoDB Streams 使用时间排序序列捕获与 DynamoDB 表中项目级更改相关的信息流。

您可以通过在 DynamoDB 表上启用流来实现事务发件箱模式。事件处理服务的 Lambda 函数与这些流存在关联。
+ 更新航班表后，DynamoDB Streams 会捕获已更改的数据，事件处理服务会轮询流以查找新记录。
+ 当新的流记录可用时，Lambda 函数会同步将事件的消息放入 SQS 队列中，便于进一步处理。您可以根据需要向 DynamoDB 项目添加属性，以捕获时间戳和序列号，从而提高实施的稳健性。

![\[使用 CDC 的事务发件箱\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/cloud-design-patterns/images/outbox-8.png)


## 代码示例
<a name="transactional-implementation-code"></a>

### 使用发件箱表
<a name="transactional-implementation-code-outbox"></a>

本节中的代码示例显示了如何使用发件箱表实现事务发件箱模式。要查看完整的代码，请参阅此示例的[GitHub存储库](https://github.com/aws-samples/transactional-outbox-pattern)。

以下代码片段在单个事务中将数据库中的 `Flight` 实体和 `Flight` 事件保存在其各自的表中。

```
@PostMapping("/flights")
    @Transactional
    public Flight createFlight(@Valid @RequestBody Flight flight) {
        Flight savedFlight = flightRepository.save(flight);
        JsonNode flightPayload = objectMapper.convertValue(flight, JsonNode.class);
        FlightOutbox outboxEvent = new FlightOutbox(flight.getId().toString(), FlightOutbox.EventType.FLIGHT_BOOKED,
                flightPayload);
        outboxRepository.save(outboxEvent);
        return savedFlight;
    }
```

另一项服务负责定期扫描发件箱表中是否有新事件，将其发送到 Amazon SQS，如果 Amazon SQS 成功响应，则将其从表格中删除。轮询速率可在 `application.properties` 文件中配置。

```
@Scheduled(fixedDelayString = "${sqs.polling_ms}")
    public void forwardEventsToSQS() {
        List<FlightOutbox> entities = outboxRepository.findAllByOrderByIdAsc(Pageable.ofSize(batchSize)).toList();
        if (!entities.isEmpty()) {
            GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder()
                    .queueName(sqsQueueName)
                    .build();
            String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl();
            List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
            entities.forEach(entity -> messageEntries.add(SendMessageBatchRequestEntry.builder()
                    .id(entity.getId().toString())
                    .messageGroupId(entity.getAggregateId())
                    .messageDeduplicationId(entity.getId().toString())
                    .messageBody(entity.getPayload().toString())
                    .build())
            );
            SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
                    .queueUrl(queueUrl)
                    .entries(messageEntries)
                    .build();
            sqsClient.sendMessageBatch(sendMessageBatchRequest);
            outboxRepository.deleteAllInBatch(entities);
        }
    }
```

### 使用更改数据捕获（CDC）
<a name="transactional-implementation-code-cdc"></a>

本节中的示例代码显示了如何使用 DynamoDB 的更改数据捕获（CDC）功能实现事务发件箱模式。要查看完整的代码，请参阅此示例的[GitHub存储库](https://github.com/aws-samples/transactional-outbox-pattern)。

以下 AWS Cloud Development Kit (AWS CDK) 代码片段创建一个 DynamoDB 航班表和 Amazon Kinesis 数据流（`cdcStream`），并将航班表配置为将其所有更新发送到该流。

```
Const cdcStream = new kinesis.Stream(this, 'flightsCDCStream', {
    streamName: 'flightsCDCStream'
})

const flightTable = new dynamodb.Table(this, 'flight', {
    tableName: 'flight',
    kinesisStream: cdcStream,
    partitionKey: {
        name: 'id',
        type: dynamodb.AttributeType.STRING,
    }

});
```

以下代码片段和配置定义一个 spring cloud stream 函数，该函数在 Kinesis 流中获取更新并将这些事件转发到 SQS 队列以进行进一步处理。

```
applications.properties
spring.cloud.stream.bindings.sendToSQS-in-0.destination=${kinesisstreamname}
spring.cloud.stream.bindings.sendToSQS-in-0.content-type=application/ddb
    
QueueService.java
@Bean
public Consumer<Flight> sendToSQS() {
    return this::forwardEventsToSQS;
}

public void forwardEventsToSQS(Flight flight) {
    GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder()
            .queueName(sqsQueueName)
            .build();
    String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl();
    try {
        SendMessageRequest send_msg_request = SendMessageRequest.builder()
                .queueUrl(queueUrl)
                .messageBody(objectMapper.writeValueAsString(flight))
                .messageGroupId("1")
                .messageDeduplicationId(flight.getId().toString())
                .build();
        sqsClient.sendMessage(send_msg_request);
    } catch (IOException | AmazonServiceException e) {
        logger.error("Error sending message to SQS", e);
    }
}
```

## GitHub 存储库
<a name="transactional-implementation-github-repo"></a>

有关此模式示例架构的完整实现，请参见 GitHub存储库，网址为[https://github.com/aws-samples/transactional-outbox-pattern](https://github.com/aws-samples/transactional-outbox-pattern)。