

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Fanout Amazon SNS 事件 AWS 到事件分叉管道
<a name="sns-fork-pipeline-as-subscriber"></a>


|  | 
| --- |
| 对于事件归档和分析，Amazon SNS 现在建议使用其与 Amazon Data Firehose 的本机集成。您可以将 Firehose 传输流订阅 SNS 主题，这样您就可以向存档和分析终端节点发送通知，例如亚马逊简单存储服务 (Amazon S3) 存储桶、亚马逊 Redshift 表、亚马逊 OpenSearch 服务（服务）等。OpenSearch 将 Amazon SNS 与 Firehose 传输流配合使用是一种完全托管且无需代码的解决方案，您无需使用任何功能。 AWS Lambda 有关更多信息，请参阅 [扇出到 Firehose 传输流](sns-firehose-as-subscriber.md)。 | 

您可以使用 Amazon SNS 构建事件驱动的应用程序，这些应用程序使用订阅者服务自动执行工作以响应发布者服务所触发的事件。此架构模式可提高服务的可重用性、可互操作性和可扩展性。但是，将事件处理分解为可满足常见事件处理要求的管道（例如，事件存储、备份、搜索、分析和重放）可能会非常耗费人力。

为了加快事件驱动型应用程序的开发，您可以订阅 Amazon SNS 主题的事件处理管道（由事件 AWS 分叉管道提供支持）。 AWS Event Fork Pipelines 是一套基于[AWS 无服务器应用程序模型](https://aws.amazon.com/serverless/sam/) (SA AWS M) 的开源[嵌套](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template-nested-applications.html)应用程序，您可以直接从 Ev [AWS ent Fork Pipelines 套件](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines)（选择**显示创建自定义 IAM 角色或资源策略的应用程序**）将其部署到您的 AWS 账户中。

有关 E AWS vent Fork Pipelines 用例，请参阅[部署和测试 Amazon SNS Event Fork Pipelines 示例应用程序](sns-deploy-test-fork-pipelines-sample-application.md)。

**Topics**
+ [AWS 事件分叉管道的工作原理](#how-sns-fork-works)
+ [部署 AWS 事件分叉管道](#deploying-sns-fork-pipelines)
+ [部署和测试 Amazon SNS Event Fork Pipelines 示例应用程序](sns-deploy-test-fork-pipelines-sample-application.md)
+ [将 AWS 事件分叉管道订阅到 Amazon SNS 主题](sns-subscribe-event-fork-pipelines.md)

## AWS 事件分叉管道的工作原理
<a name="how-sns-fork-works"></a>

AWS Event Fork Pipelines 是一种无服务器设计模式。但是，它也是一套基于 S AWS AM 的嵌套无服务器应用程序（您可以将其直接从 AWS Serverless Application Repository (S AWS AR) 部署到您的， AWS 账户 以丰富您的事件驱动平台）。您可以根据架构的需要单独部署这些嵌套的应用程序。

**Topics**
+ [事件存储与备份管线](#sns-fork-event-storage-and-backup-pipeline)
+ [事件搜索与分析管线](#sns-fork-event-search-and-analytics-pipeline)
+ [事件重播管线](#sns-fork-event-replay-pipeline)

下图显示了一个由三个嵌套应用程序补充的 AWS Event Fork Pipelines 应用程序。根据您的架构要求，您可以在 AWS SAR 上独立部署 AWS Event Fork Pipelines 套件中的任何管道。

![\[AWS 事件分叉管道架构，展示了如何通过三个不同的管道筛选和处理来自 Amazon SNS 主题的事件：事件存储和备份、事件搜索和分析以及事件重播。这些管线被描绘成垂直堆叠的框，每个管线都独立并行处理来自同一 Amazon SNS 主题的事件。\]](http://docs.aws.amazon.com/zh_cn/sns/latest/dg/images/sns-fork-pipeline-as-subscriber-how-it-works.png)


为每个管线订阅了相同的 Amazon SNS 主题，并允许管线在事件发布到主题时并行处理这些事件。每个管线都是独立的，并且可以设置其自己的[订阅筛选策略](sns-subscription-filter-policies.md)。这允许管线仅处理它感兴趣的部分事件（而不是发布到主题的所有事件）。

**注意**  
由于您将三个 AWS 事件分叉管道放置在常规事件处理管道旁边（可能已经订阅了您的 Amazon SNS 主题），因此您无需更改当前消息发布者的任何部分即可在现有 AWS 工作负载中利用事件分叉管道。

### 事件存储与备份管线
<a name="sns-fork-event-storage-and-backup-pipeline"></a>

下图显示了[事件存储与备份管线](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-event-storage-backup-pipeline)。您可以为此管线订阅 Amazon SNS 主题来自动备份流经系统的事件。

该管道由一个用于缓冲由 Amazon SNS 主题传送的事件的 Amazon SQS 队列、一个自动轮询队列中这些事件并将其推送到流中的 AWS Lambda 函数以及一个持久备份流加载的事件的 Amazon S3 存储桶组成。

![\[Fork-Event-Storage-Backup-Pipeline，旨在处理和备份 Amazon SNS 主题中的事件。流程从 Amazon SNS 主题开始，事件通过该流程扇出到 Amazon SQS 队列。然后，这些经过筛选的事件由 Lambda 函数处理，该函数会将它们转发到 Data Firehose。在将事件加载到 Amazon S3 备份存储桶之前，Firehose 流负责缓冲、转换和压缩这些事件。最后，可以使用 Amazon Athena 来查询存储的数据。该图使用一系列图标和箭头来说明从一项服务到另一项服务的流程，清楚地标记了管线的每个组件。\]](http://docs.aws.amazon.com/zh_cn/sns/latest/dg/images/sns-fork-event-storage-and-backup-pipeline.png)


要微调 Firehose 流的行为，可将其配置为在将事件加载到存储桶之前对事件进行缓冲、转换和压缩。在加载事件时，可以使用 Amazon Athena 通过标准 SQL 查询来查询存储桶。您也可以将管道配置为重用现有 Amazon S3 存储桶或创建一个新的存储桶。

### 事件搜索与分析管线
<a name="sns-fork-event-search-and-analytics-pipeline"></a>

下图显示了[事件搜索与分析管线](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-event-search-analytics-pipeline)。您可以为此管线订阅 Amazon SNS 主题以便在搜索域中为流经系统的事件编制索引，然后对这些事件进行分析。

该管道由一个用于缓冲由 Amazon SNS 主题传送的事件的 Amazon SQS 队列、一个轮询队列中的事件并将其推 OpenSearch 送到流中的 AWS Lambda 函数、一个为 Firehose 流加载的事件编制索引的 Amazon S3 域以及一个存储无法在搜索域中编制索引的死信事件的 Amazon S3 存储桶组成。

![\[AWS 架构中的事件搜索和分析管道。它从左边开始，Amazon SNS 主题接收所有事件。然后，这些事件通过表示“扇出筛选出的事件”的虚线汇入到 Amazon SQS 队列中。该队列中的事件由 Lambda 函数处理，该函数随后将事件转到 Data Firehose 流。Data Firehose 将事件定向到两个目的地：一条路由通向 Amazon Elasticsearch Service 进行索引，另一条路由将无法处理的事件或“死信”事件发送到 Amazon S3 死信存储桶。在最右边，Elasticsearch Service 的输出馈送到 Kibana 控制面板中进行分析和可视化。整个流程是水平布局的，每个组件都通过显示数据流方向的线条相连。\]](http://docs.aws.amazon.com/zh_cn/sns/latest/dg/images/sns-fork-event-search-and-analytics-pipeline.png)


要在事件缓冲、转换和压缩方面微调 Firehose 流，您可以配置此管线。

您还可以配置管道是应重复使用您的现有 OpenSearch 域 AWS 账户 还是为您创建一个新域。在搜索域中为事件编制索引时，您可以使用 Kibana 对事件运行分析并实时更新可视化控制面板。

### 事件重播管线
<a name="sns-fork-event-replay-pipeline"></a>

下图显示了[事件重播管线](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-event-replay-pipeline)。要记录系统在过去 14 天内处理过的事件（例如，当您的平台需要从故障中恢复时），您可以为此管道订阅 Amazon SNS 主题，然后重新处理事件。

该管道由一个 Amazon SQS 队列和一个 AWS Lambda 函数组成，该队列用于缓冲由 Amazon SNS 主题传送的事件，以及一个用于轮询队列中的事件并将其重新驱动到您的常规事件处理管道的函数，该管道也已订阅您的主题。

![\[流程图格式的事件重播管线。从左到右，它以 Amazon SNS 主题开头，该主题将经过筛选的事件分发给两个并行进程。上方的流程代表您的常规事件处理管线，其中包括处理事件的 Amazon SQS 队列。标记为 “fork-event-replay-pipeline” 的下层流程包括一个 Amazon SQS 重播队列，事件在由 Lambda 重播函数处理之前会暂时存储在该队列中。此 Lambda 函数能够根据重播功能的启用还是禁用，将事件重新导入您的常规事件处理管线中或将其保留以供重播。该图还表明，操作员可以控制启用或禁用事件重播功能。\]](http://docs.aws.amazon.com/zh_cn/sns/latest/dg/images/sns-fork-event-replay-pipeline.png)


**注意**  
默认情况下，重播功能已禁用，而不会重新导入您的事件。如果您需要重新处理事件，则必须启用 Amazon SQS 重播队列作为 AWS Lambda 重播函数的事件源。

## 部署 AWS 事件分叉管道
<a name="deploying-sns-fork-pipelines"></a>

[AWS Event Fork Pipelines 套件](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines)**（选择 “显示创建自定义 IAM 角色或资源策略**的应用程序”）在中作为一组公共应用程序提供 AWS Serverless Application Repository，您可以从中使用[AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)手动部署和测试它们。有关使用 AWS Lambda 控制台部署管道的信息，请参阅[将 AWS 事件分叉管道订阅到 Amazon SNS 主题](sns-subscribe-event-fork-pipelines.md)。

在生产场景中，我们建议在整个应用程序的 AWS SAM 模板中嵌入 AWS 事件分支管道。嵌套应用程序功能允许您通过将资源添加到 AWS SAM 模板、引用嵌套应用程序的 S AWS AR `ApplicationId` 和`[AWS::Serverless::Application](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template.html#serverless-sam-template-application)`来实现此`SemanticVersion`目的。

例如，您可以将以下 YAML 代码段添加到 SA AWS M 模板的`Resources`部分，从而将事件存储和备份管道用作嵌套应用程序。

```
Backup:   
    Type: AWS::Serverless::Application
  Properties:
    Location:
      ApplicationId: arn:aws:serverlessrepo:us-east-2:123456789012:applications/fork-event-storage-backup-pipeline
      SemanticVersion: 1.0.0
    Parameters: 
      #The ARN of the Amazon SNS topic whose messages should be backed up to the Amazon S3 bucket.
      TopicArn: !Ref MySNSTopic
```

指定参数值时，您可以使用 AWS CloudFormation 内部函数来引用模板中的其他资源。例如，在上面的 YAML 片段中，`TopicArn`参数引用了模板中其他地方定义的`[AWS::SNS::Topic](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sns-topic.html)` AWS SAM 资源`MySNSTopic`。有关更多信息，请参阅 *AWS CloudFormation 用户指南*中的[内置函数参考](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference.html)。

**注意**  
您的 AWS SAR 应用程序的 AWS Lambda 控制台页面包括**复制为 SAM 资源**按钮，该按钮可将嵌套 SA AWS R 应用程序所需的 YAML 复制到剪贴板。

# 部署和测试 Amazon SNS Event Fork Pipelines 示例应用程序
<a name="sns-deploy-test-fork-pipelines-sample-application"></a>

为了加快事件驱动型应用程序的开发，您可以订阅 Amazon SNS 主题的事件处理管道（由事件 AWS 分叉管道提供支持）。 AWS Event Fork Pipelines 是一套基于[AWS 无服务器应用程序模型](https://aws.amazon.com/serverless/sam/) (SA AWS M) 的开源[嵌套](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template-nested-applications.html)应用程序，您可以直接从 Ev [AWS ent Fork Pipelines 套件](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines)（选择**显示创建自定义 IAM 角色或资源策略的应用程序**）将其部署到您的 AWS 账户中。有关更多信息，请参阅 [AWS 事件分叉管道的工作原理](sns-fork-pipeline-as-subscriber.md#how-sns-fork-works)。

本页介绍如何使用部署和测试 E AWS vent Fork Pipelines 示例应用程序。 AWS 管理控制台 

**重要**  
为避免在部署完 AWS 事件分支管道示例应用程序后产生不必要的费用，请删除其 CloudFormation 堆栈。有关更多信息，请参阅 *AWS CloudFormation 用户指南*中的[在 CloudFormation 控制台上删除堆栈](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html)。

# AWS 事件分叉管道用例示例
<a name="example-sns-fork-use-case"></a>

以下场景描述了一个使用 Event Fork Pipelines AWS 的事件驱动型无服务器电子商务应用程序。您可以在中使用此[示例电子商务应用程序](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-example-ecommerce-checkout-api)， AWS Serverless Application Repository 然后 AWS 账户 使用控制台将其部署到您的 AWS Lambda 控制台中，您可以在其中对其进行测试并检查其源代码 GitHub。

![\[集成 AWS 服务的无服务器电子商务应用程序的架构。它描绘了从电子商务用户通过 API Gateway 下订单到包括订单存储、搜索分析和重播在内的不同处理管道的流程，展示了如何通过 Amazon SNS、Lambda、Amazon SQS、DynamoDB 和 Kibana 管理和分析事件。\]](http://docs.aws.amazon.com/zh_cn/sns/latest/dg/images/sns-fork-example-use-case.png)


该电子商务应用程序通过 RESTful API Gateway托管并由该 AWS Lambda 功能支持的API接受买家的订单`CheckoutApiBackendFunction`。此函数将收到的所有订单发布到名为 `CheckoutEventsTopic` 的 Amazon SNS 主题，该主题转而将订单分散到四个不同的管道。

第一个管道是由电子商务应用程序的拥有者设计和实现的常规结算处理管道。该管道具有用于缓冲所有已收到订单的 Amazon SQS 队列`CheckoutQueue`、一个名为`CheckoutFunction`的 AWS Lambda 函数，用于轮询队列以处理这些订单，还有一个用于安全保存所有已下订单的 DynamoDB 表`CheckoutTable`。

## 应用 AWS 事件分叉管道
<a name="applying-sns-fork-pipelines"></a>

电子商务应用程序的组件处理核心业务逻辑。但是，电子商务应用程序拥有者还需满足：
+ **合规性** - 安全的、压缩的静态加密备份，清理敏感信息
+ **弹性** - 在执行过程中断的情况下重播最近的订单
+ **可搜索性** - 对已下订单运行分析并生成指标

应用程序所有者无需实现此事件处理逻辑，而是可以订阅 `CheckoutEventsTopic` Amazon SNS 主题的 AWS Event Fork Pipelines
+ [事件存储与备份管线](sns-fork-pipeline-as-subscriber.md#sns-fork-event-storage-and-backup-pipeline)配置为转换数据以删除信用卡详细信息，缓冲数据 60 秒，使用 GZIP 压缩数据，并使用 Amazon S3 的原定设置客户自主管理型密钥来加密数据。此密钥由 () 管理 AWS 并由 AWS Key Management Service (AWS KMS) 提供支持。

  有关更多信息，请参阅《Amazon Data Firehose 开发人员指南》**中的[为您的目的地选择 Amazon S3](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-s3)、[Amazon Data Firehose 数据转换](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)和[配置设置](https://docs.aws.amazon.com/firehose/latest/dev/create-configure.html)。
+ 为[事件搜索与分析管线](sns-fork-pipeline-as-subscriber.md#sns-fork-event-search-and-analytics-pipeline)配置了一个 30 秒的索引重试持续时间、一个用于存储无法在搜索域中编制索引的订单的存储桶和一个用来限制已编制索引的订单集的筛选策略。

  有关更多信息，请参阅 *Amazon Data Firehose 开发者*指南中的[为您的目的地选择 OpenSearch 服务](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-elasticsearch)。
+ [事件重播管线](sns-fork-pipeline-as-subscriber.md#sns-fork-event-replay-pipeline) 为配置了常规订单处理管道（由电子商务应用程序拥有者设计和实施）的 Amazon SQS 队列部分。

  有关更多信息，请参阅 *Amazon Simple Queue Service 开发人员指南*中的[队列名称和 URL](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-general-identifiers.html#queue-name-url)。

在事件搜索与分析管道的配置中设置以下 JSON 筛选策略。它仅匹配总金额为 100 美元或更多的传入订单。有关更多信息，请参阅 [Amazon SNS 消息筛选](sns-message-filtering.md)。

```
{				
   "amount": [{ "numeric": [ ">=", 100 ] }]
}
```

使用 E AWS vent Fork Pipelines 模式，电子商务应用程序所有者可以避免开发开销，这种开销通常是在为事件处理编写非微分逻辑之后出现的。取而代之的是，她可以将 AWS 事件分叉管道直接从部署 AWS Serverless Application Repository 到她身上 AWS 账户。

# 第 1 步：部署 Amazon SNS 示例应用程序
<a name="deploy-sample-application"></a>

1. 登录 [AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)。

1. 在导航面板上，选择 **Functions (函数)**，然后选择 **Create function (创建函数)**。

1. 在 **Create function (创建函数)** 页面上，执行以下操作：

   1. 依次选择 **Browse serverless app repository（浏览无服务器应用程序存储库）**、**Public applications（公共应用程序）**、**Show apps that create custom roles or resource policies（显示创建 IAM 角色或资源策略的应用程序）**。

   1. 搜索 `fork-example-ecommerce-checkout-api`，然后选择该应用程序。

1. 在 **fork-example-ecommerce-checkout-api** 页面上，执行以下操作：

   1. 在 **Application settings (应用程序设置)** 部分中，输入 **Application name (应用程序名称)**（例如，`fork-example-ecommerce-my-app`）。
**注意**  
要稍后轻松找到您的资源，请保留前缀 `fork-example-ecommerce`。
对于每个部署，应用程序名称必须唯一。如果您重复使用应用程序名称，则部署将仅更新先前部署的 CloudFormation 堆栈（而不是创建新的堆栈）。

   1. （可选）输入以下**LogLevel**设置之一以执行应用程序的 Lambda 函数：
      + `DEBUG`
      + `ERROR`
      + `INFO`（默认值）
      + `WARNING`

1. 选择 **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications (我确认此应用程序创建自定义 IAM 角色和资源策略并部署嵌套应用程序)**，然后在页面底部选择 **Deploy (部署)**。

在 **fork-example-ecommerce-的部署状态*my-app***页面上，Lambda 会显示 “**您的应用程序正在部署中**” 状态。

在**资源**部分中， CloudFormation 开始创建堆栈并显示每个资源的 **CREATE\$1IN\$1PROGRESS** 状态。该过程完成后， CloudFormation 将显示 “**创建\$1完成**” 状态。

**注意**  
部署所有资源可能需要 20-30 分钟。

部署完成后，Lambda 将显示 **Your application has been deployed（您的应用程序已部署完成）**状态。

# 步骤 2：执行与 SNS 关联的示例应用程序
<a name="execute-sample-application"></a>

1. 在 AWS Lambda 控制台的导航面板上，选择**应用程序**。

1. 在 **Applications (应用程序)** 页面上的搜索字段中，搜索 `serverlessrepo-fork-example-ecommerce-my-app`，然后选择该应用程序。

1. 在 **Resources (资源)** 部分中，执行以下操作：

   1. 例如，要查找类型为的资源 **ApiGatewayRestApi**，请按**类型**对资源进行排序`ServerlessRestApi`，然后展开该资源。

   1. 将显示两个嵌套资源，分别是 “**ApiGateway部署**” 和 “**ApiGateway阶段**”。

   1. 复制链接 **Prod API endpoint (Prod API 终端节点)** 并为其附加 `/checkout`，例如：

      ```
      https://abcdefghij.execute-api.us-east-2.amazonaws.com/Prod/checkout
      ```

1. 将以下 JSON 复制到名为 `test_event.json` 的文件中。

   ```
   {
      "id": 15311,
      "date": "2019-03-25T23:41:11-08:00",
      "status": "confirmed",
      "customer": {
         "id": 65144,		
   	 "quantity": 2,
         "price": 25.00,
         "subtotal": 50.00
      }]
   }
   ```

1. 要将 HTTPS 请求发送到您的 API 端点，请通过执行 `curl` 命令来将示例事件负载作为输入传递，例如：

   ```
   curl -d "$(cat test_event.json)" https://abcdefghij.execute-api.us-east-2.amazonaws.com/Prod/checkout
   ```

   API 将返回以下空响应，并指示已成功执行：

   ```
   { }
   ```

# 第 3 步：验证 Amazon SNS 应用程序和管道性能
<a name="verify-sample-application-pipelines"></a>

## 步骤 1：验证示例签出管道的执行
<a name="verify-execution-checkout-pipeline"></a>

1. 登录 [Amazon DynamoDB 控制台](https://console.aws.amazon.com/dynamodb/)。

1. 在导航面板上，选择**表**。

1. 搜索 `serverlessrepo-fork-example` 并选择 `CheckoutTable`。

1. 在表详细信息页面上，选择**项目**，然后选择已创建的项目。

   将显示存储的属性。

## 步骤 2：验证事件存储与备份管道的执行
<a name="verify-execution-event-storage-backup-pipeline"></a>

1. 登录 [Amazon S3 控制台](https://console.aws.amazon.com/s3/)。

1. 在导航面板上，选择 **Buckets (存储桶)**。

1. 搜索 `serverlessrepo-fork-example`，然后选择 `CheckoutBucket`。

1. 导航目录层次结构，直到找到扩展名为 `.gz` 的文件。

1. 要下载该文件，请依次选择 **Actions (操作)** 和 **Open (打开)**。

1. 为管道配置了一个 Lambda 函数，此函数将清理信用卡信息以实现合规性。

   要验证存储的 JSON 负载不包含任何信用卡信息，请解压缩该文件。

## 步骤 3：验证事件搜索与分析管道的执行
<a name="verify-execution-event-search-analytics-pipeline"></a>

1. 登录到[OpenSearch 服务控制台](https://console.aws.amazon.com/aos/)。

1. 在导航面板上的 **My domains (我的域)** 下，选择前缀为 `serverl-analyt` 的域。

1. 为管道配置了一个 Amazon SNS 订阅筛选策略，该策略设置一个数值匹配条件。

   ****要验证该事件是否因为指的是价值高于100美元的订单而被索引，请在**服务器分析*abcdefgh1ijk*页面上选择指数，checkout\$1even** ts。****

## 步骤 4：验证事件重播管道的执行
<a name="verify-execution-event-replay-pipeline"></a>

1. 登录 [Amazon SQS 控制台](https://console.aws.amazon.com/sqs/)。

1. 在队列列表中，搜索 `serverlessrepo-fork-example` 并选择 `ReplayQueue`。

1. 选择**发送和接收消息**。

1. 在 **fork-example-ecommerce-*my-app*... ReplayP-ReplayQueue-中发送和接收消息*123ABCD4E5F6***对话框中，选择**轮询留**言。

1. 要验证事件是否已入队，请选择队列中显示的消息旁边的 **More Details (更多详细信息)**。

# 步骤 4：模拟问题并重播事件以进行恢复
<a name="simulate-issue-replay-events-for-recovery"></a>

## 步骤 1：启用模拟的问题并发送第二个 API 请求
<a name="enable-simulated-issue-send-second-api-request"></a>

1. 登录 [AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)。

1. 在导航面板上，选择 **Functions (函数)**。

1. 搜索 `serverlessrepo-fork-example` 并选择 `CheckoutFunction`。

1. 在 **fork-example-ecommerce-*my-app*-CheckoutFunction-*ABCDEF*...** **页面的**环境变量**部分中，将 **BUG\$1** ENABLED 变量设置**为 true**，然后选择保存。**

1. 将以下 JSON 复制到名为 `test_event_2.json` 的文件中。

   ```
   {
   	   "id": 9917,
   	   "date": "2019-03-26T21:11:10-08:00",
   	   "status": "confirmed",
   	   "customer": {
   	      "id": 56999,
   "quantity": 1,
   	      "price": 75.00,
   	      "subtotal": 75.00
   	   }]
   	}
   ```

1. 要将 HTTPS 请求发送到您的 API 端点，请通过执行 `curl` 命令来将示例事件负载作为输入传递，例如：

   ```
   curl -d "$(cat test_event_2.json)" https://abcdefghij.execute-api.us-east-2.amazonaws.com/Prod/checkout
   ```

   API 将返回以下空响应，并指示已成功执行：

   ```
   { }
   ```

## 步骤 2：验证模拟数据损坏
<a name="verify-simulated-data-corruption"></a>

1. 登录 [Amazon DynamoDB 控制台](https://console.aws.amazon.com/dynamodb/)。

1. 在导航面板上，选择**表**。

1. 搜索 `serverlessrepo-fork-example` 并选择 `CheckoutTable`。

1. 在表详细信息页面上，选择**项目**，然后选择已创建的项目。

   将显示存储的属性，其中一些标记为 **CORRUPTED\$1 (已损坏\$1)**

## 步骤 3：禁用模拟的问题
<a name="disable-simulated-issue"></a>

1. 登录 [AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)。

1. 在导航面板上，选择 **Functions (函数)**。

1. 搜索 `serverlessrepo-fork-example` 并选择 `CheckoutFunction`。

1. 在 **fork-example-ecommerce-*my-app*-CheckoutFunction-*ABCDEF*...** **页面的**环境变量**部分中，将 **BUG\$1** ENABLED 变量设置**为 false**，然后选择保存。**

## 步骤 4：启用重播以从问题中恢复
<a name="enable-replay-recover-from-simulated-issue"></a>

1. 在 AWS Lambda 控制台的导航面板上，选择**功能**。

1. 搜索 `serverlessrepo-fork-example` 并选择 `ReplayFunction`。

1. 展开 **Designer** 部分，选择 **SQS** 磁贴，然后在 **SQS** 部分中，选择 **Enabled (启用)**。
**注意**  
启用 Amazon SQS 事件源触发器大约需要 1 分钟。

1. 选择**保存**。

1. 要查看已恢复的属性，请返回到 Amazon DynamoDB 控制台。

1. 要禁用重播，请返回 AWS Lambda 控制台并禁用 Amazon SQS 事件源触发器。`ReplayFunction`

# 将 AWS 事件分叉管道订阅到 Amazon SNS 主题
<a name="sns-subscribe-event-fork-pipelines"></a>

为了加快事件驱动型应用程序的开发，您可以订阅 Amazon SNS 主题的事件处理管道（由事件 AWS 分叉管道提供支持）。 AWS Event Fork Pipelines 是一套基于[AWS 无服务器应用程序模型](https://aws.amazon.com/serverless/sam/) (SA AWS M) 的开源[嵌套](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template-nested-applications.html)应用程序，您可以直接从 Ev [AWS ent Fork Pipelines 套件](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines)（选择**显示创建自定义 IAM 角色或资源策略的应用程序**）将其部署到您的 AWS 账户中。有关更多信息，请参阅 [AWS 事件分叉管道的工作原理](sns-fork-pipeline-as-subscriber.md#how-sns-fork-works)。

本节介绍如何使用部署管道，然后 AWS 管理控制台 将 AWS 事件分叉管道订阅 Amazon SNS 主题。在开始之前，请[创建 Amazon SNS 主题](sns-create-topic.md)。

要删除构成管道的资源，请在 AWS Lambda 控制台的**应用程序**页面上找到管道，展开 **SAM 模板部分**，选择**CloudFormation堆栈**，然后选择**其他操作**，**删除堆栈**。

# 订阅事件存储与备份管道并将其部署到 Amazon SNS
<a name="deploy-event-storage-backup-pipeline"></a>


|  | 
| --- |
| 对于事件归档和分析，Amazon SNS 现在建议使用其与 Amazon Data Firehose 的本机集成。您可以将 Firehose 传输流订阅 SNS 主题，这样您就可以向存档和分析终端节点发送通知，例如亚马逊简单存储服务 (Amazon S3) 存储桶、亚马逊 Redshift 表、亚马逊 OpenSearch 服务（服务）等。OpenSearch 将 Amazon SNS 与 Firehose 传输流配合使用是一种完全托管且无需代码的解决方案，您无需使用任何功能。 AWS Lambda 有关更多信息，请参阅 [扇出到 Firehose 传输流](sns-firehose-as-subscriber.md)。 | 

本教程说明如何部署[事件存储与备份管道](sns-fork-pipeline-as-subscriber.md#sns-fork-event-storage-and-backup-pipeline)并为该管道订阅 Amazon SNS 主题。此过程会自动将与管道关联的 AWS SAM 模板转换为 CloudFormation 堆栈，然后将该堆栈部署到您的 AWS 账户。此过程还会创建和配置构成事件存储与备份管道的资源集，包括以下内容：
+ Amazon SQS 队列
+ Lambda 函数
+ Firehose 传输流
+ Amazon S3 备份存储桶

有关将流配置为以 Amazon S3 存储桶作为目标的更多信息，请参阅 *Amazon Data Firehose API Reference* 中的 `[S3DestinationConfiguration](https://docs.aws.amazon.com/firehose/latest/APIReference/API_S3DestinationConfiguration.html)`。

有关转换事件以及配置事件缓冲、事件压缩和事件加密的详细信息，请参阅 *Amazon Data Firehose 开发人员指南*中的[创建传输流](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)。

有关筛选事件的更多信息，请参阅本指南中的 [Amazon SNS 订阅筛选策略](sns-subscription-filter-policies.md)。

1. 登录 [AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)。

1. 在导航面板上，选择 **Functions (函数)**，然后选择 **Create function (创建函数)**。

1. 在 **Create function (创建函数)** 页面上，执行以下操作：

   1. 依次选择 **Browse serverless app repository（浏览无服务器应用程序存储库）**、**Public applications（公共应用程序）**、**Show apps that create custom roles or resource policies（显示创建 IAM 角色或资源策略的应用程序）**。

   1. 搜索 `fork-event-storage-backup-pipeline`，然后选择该应用程序。

1. 在 **fork-event-storage-backup-pipelin** e 页面上，执行以下操作：

   1. 在 **Application settings (应用程序设置)** 部分中，输入 **Application name (应用程序名称)**（例如，`my-app-backup`）。
**注意**  
对于每个部署，应用程序名称必须唯一。如果您重复使用应用程序名称，则部署将仅更新先前部署的 CloudFormation 堆栈（而不是创建新的堆栈）。

   1. （可选）对于 **BucketArn**，请输入加载传入事件的 Amazon S3 存储桶的 ARN。如果您未输入值，则会在您的 AWS 账户中创建一个新的 Amazon S3 存储桶。

   1. （可选）对于 **DataTransformationFunctionArn**，输入用于转换传入事件的 Lambda 函数的 ARN。如果您不输入值，则将禁用数据转换。

   1. （可选）输入以下**LogLevel**设置之一以执行应用程序的 Lambda 函数：
      + `DEBUG`
      + `ERROR`
      + `INFO`（默认值）
      + `WARNING`

   1. 对于 **TopicArn**，输入要订阅此分叉管道实例的 Amazon SNS 主题的 ARN。

   1. （可选）对于**StreamBufferingIntervalInSeconds**和 **StreamBufferingSizeInMBs**，输入用于配置传入事件缓冲的值。如果您不输入任何值，则使用 300 秒和 5 MB。

   1. （可选）输入以下**StreamCompressionFormat**设置之一以压缩传入的事件：
      + `GZIP`
      + `SNAPPY`
      + `UNCOMPRESSED`（默认值）
      + `ZIP`

   1. （可选）对于 **StreamPrefix**，输入字符串前缀以命名存储在 Amazon S3 备份存储桶中的文件。如果您不输入值，则不使用任何前缀。

   1. （可选）对于 **SubscriptionFilterPolicy**，输入 JSON 格式的 Amazon SNS 订阅筛选策略，用于筛选传入的事件。筛选策略决定在 OpenSearch 服务索引中对哪些事件进行索引。如果您不输入值，则不使用筛选（为所有事件编制索引）。

   1. （可选）对于 **SubscriptionFilterPolicyScope**，输入字符串`MessageBody`或`MessageAttributes`以启用基于负载或基于属性的邮件筛选。

   1. 选择 **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications (我确认此应用程序创建自定义 IAM 角色和资源策略并部署嵌套应用程序)**，然后选择 **Deploy (部署)**。

在 “**部署状态 *my-app***” 页面上，Lambda 会显示 “**您的应用程序正在部署中**” 状态。

在**资源**部分中， CloudFormation 开始创建堆栈并显示每个资源的 **CREATE\$1IN\$1PROGRESS** 状态。该过程完成后， CloudFormation 将显示 “**创建\$1完成**” 状态。

部署完成后，Lambda 将显示 **Your application has been deployed（您的应用程序已部署完成）**状态。

发布到您的 Amazon SNS 主题的消息将存储在由事件存储与备份管道自动预置的 Amazon S3 备份存储桶中。

# 订阅事件搜索与分析管道并将其部署到 Amazon SNS
<a name="deploy-event-search-analytics-pipeline"></a>


|  | 
| --- |
| 对于事件归档和分析，Amazon SNS 现在建议使用其与 Amazon Data Firehose 的本机集成。您可以将 Firehose 传输流订阅 SNS 主题，这样您就可以向存档和分析终端节点发送通知，例如亚马逊简单存储服务 (Amazon S3) 存储桶、亚马逊 Redshift 表、亚马逊 OpenSearch 服务（服务）等。OpenSearch 将 Amazon SNS 与 Firehose 传输流配合使用是一种完全托管且无需代码的解决方案，您无需使用任何功能。 AWS Lambda 有关更多信息，请参阅 [扇出到 Firehose 传输流](sns-firehose-as-subscriber.md)。 | 

本教程说明如何部署[事件搜索与分析管道](sns-fork-pipeline-as-subscriber.md#sns-fork-event-search-and-analytics-pipeline)并为该管道订阅 Amazon SNS 主题。此过程会自动将与管道关联的 AWS SAM 模板转换为 CloudFormation 堆栈，然后将该堆栈部署到您的 AWS 账户。此过程还会创建和配置构成事件搜索与分析管道的资源集，包括以下内容：
+ Amazon SQS 队列
+ Lambda 函数
+ Firehose 传输流
+ 亚马逊 OpenSearch 服务域名
+ Amazon S3 死信存储桶

有关将流配置为以索引作为目标的更多信息，请参阅《Amazon Data Firehose API Reference》**中的 `[ElasticsearchDestinationConfiguration](https://docs.aws.amazon.com/firehose/latest/APIReference/API_ElasticsearchDestinationConfiguration.html)`。

有关转换事件以及配置事件缓冲、事件压缩和事件加密的详细信息，请参阅 *Amazon Data Firehose 开发人员指南*中的[创建传输流](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)。

有关筛选事件的更多信息，请参阅本指南中的 [Amazon SNS 订阅筛选策略](sns-subscription-filter-policies.md)。

1. 登录 [AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)。

1. 在导航面板上，选择 **Functions (函数)**，然后选择 **Create function (创建函数)**。

1. 在 **Create function (创建函数)** 页面上，执行以下操作：

   1. 依次选择 **Browse serverless app repository（浏览无服务器应用程序存储库）**、**Public applications（公共应用程序）**、**Show apps that create custom roles or resource policies（显示创建 IAM 角色或资源策略的应用程序）**。

   1. 搜索 `fork-event-search-analytics-pipeline`，然后选择该应用程序。

1. 在 **fork-event-search-analytics-pipelin** e 页面上，执行以下操作：

   1. 在 **Application settings (应用程序设置)** 部分中，输入 **Application name (应用程序名称)**（例如，`my-app-search`）。
**注意**  
对于每个部署，应用程序名称必须唯一。如果您重复使用应用程序名称，则部署将仅更新先前部署的 CloudFormation 堆栈（而不是创建新的堆栈）。

   1. （可选）对于 **DataTransformationFunctionArn**，输入用于转换传入事件的 Lambda 函数的 ARN。如果您不输入值，则将禁用数据转换。

   1. （可选）输入以下**LogLevel**设置之一以执行应用程序的 Lambda 函数：
      + `DEBUG`
      + `ERROR`
      + `INFO`（默认值）
      + `WARNING`

   1. （可选）对于 **SearchDomainArn**，输入 OpenSearch 服务域的 ARN，该域是一个配置所需计算和存储功能的集群。如果您不输入值，则使用默认配置创建新域。

   1. 对于 **TopicArn**，输入要订阅此分叉管道实例的 Amazon SNS 主题的 ARN。

   1. 对于 **SearchIndexName**，输入用于事件搜索和分析的 OpenSearch 服务索引的名称。
**注意**  
以下配额适用于索引名称：  
不能包含大写字母
不能包含以下字符：`\ / * ? " < > | ` , #`
不能以下列字符开头：`- + _`
不能为以下内容：`. ..`
长度不能超过 80 个字符
长度不能超过 255 个字节
不能包含冒号（来自 OpenSearch 服务 7.0）

   1. （可选）为 OpenSearch 服务索引的轮换周期输入以下**SearchIndexRotationPeriod**设置之一：
      + `NoRotation`（默认值）
      + `OneDay`
      + `OneHour`
      + `OneMonth`
      + `OneWeek`

      索引轮换将时间戳附加到索引名称，从而促进旧数据的到期。

   1. 对于 **SearchTypeName**，输入用于在索引中组织事件的 OpenSearch 服务类型的名称。
**注意**  
OpenSearch 服务类型名称可以包含任何字符（空字节除外），但不能以开头`_`。
对于 S OpenSearch ervice 6.x，每个索引只能有一个类型。如果您为具有其他类型的现有索引指定新类型，Firehose 会返回运行时错误。

   1. （可选）对于**StreamBufferingIntervalInSeconds**和 **StreamBufferingSizeInMBs**，输入用于配置传入事件缓冲的值。如果您不输入任何值，则使用 300 秒和 5 MB。

   1. （可选）输入以下**StreamCompressionFormat**设置之一以压缩传入的事件：
      + `GZIP`
      + `SNAPPY`
      + `UNCOMPRESSED`（默认值）
      + `ZIP`

   1. （可选）对于 **StreamPrefix**，输入字符串前缀以命名存储在 Amazon S3 死信存储桶中的文件。如果您不输入值，则不使用任何前缀。

   1. （可选）对于 **StreamRetryDurationInSecons**，输入 Firehose 无法索引服务索引中的 OpenSearch 事件时的重试持续时间。如果您不输入值，则使用 300 秒。

   1. （可选）对于 **SubscriptionFilterPolicy**，输入 JSON 格式的 Amazon SNS 订阅筛选策略，用于筛选传入的事件。筛选策略决定在 OpenSearch 服务索引中对哪些事件进行索引。如果您不输入值，则不使用筛选（为所有事件编制索引）。

   1. 选择 **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications (我确认此应用程序创建自定义 IAM 角色和资源策略并部署嵌套应用程序)**，然后选择 **Deploy (部署)**。

在 “**部署状态 *my-app-search***” 页面上，Lambda 会显示 “**您的应用程序正在部署中**” 状态。

在**资源**部分中， CloudFormation 开始创建堆栈并显示每个资源的 **CREATE\$1IN\$1PROGRESS** 状态。该过程完成后， CloudFormation 将显示 “**创建\$1完成**” 状态。

部署完成后，Lambda 将显示 **Your application has been deployed（您的应用程序已部署完成）**状态。

发布到您的 Amazon SNS 主题的消息将在事件搜索和分析管 OpenSearch 道自动配置的服务索引中编制索引。如果该管道无法为事件编制索引，它会将其存储在 S3 死信存储桶中。

# 使用 Amazon SNS 集成，部署事件重播管道
<a name="deploy-event-replay-pipeline"></a>

本教程说明如何部署[事件重播管道](sns-fork-pipeline-as-subscriber.md#sns-fork-event-replay-pipeline)并为该管道订阅 Amazon SNS 主题。此过程会自动将与管道关联的 AWS SAM 模板转换为 CloudFormation 堆栈，然后将该堆栈部署到您的 AWS 账户。此过程还会创建和配置构成事件重播管道的资源集，包括一个 Amazon SQS 队列和一个 Lambda 函数。

有关筛选事件的更多信息，请参阅本指南中的 [Amazon SNS 订阅筛选策略](sns-subscription-filter-policies.md)。

1. 登录 [AWS Lambda 控制台](https://console.aws.amazon.com/lambda/)。

1. 在导航面板上，选择 **Functions (函数)**，然后选择 **Create function (创建函数)**。

1. 在 **Create function (创建函数)** 页面上，执行以下操作：

   1. 依次选择 **Browse serverless app repository（浏览无服务器应用程序存储库）**、**Public applications（公共应用程序）**、**Show apps that create custom roles or resource policies（显示创建 IAM 角色或资源策略的应用程序）**。

   1. 搜索 `fork-event-replay-pipeline`，然后选择该应用程序。

1. 在 **fork-event-replay-pipeline** 页面中，执行以下操作：

   1. 在 **Application settings (应用程序设置)** 部分中，输入 **Application name (应用程序名称)**（例如，`my-app-replay`）。
**注意**  
对于每个部署，应用程序名称必须唯一。如果您重复使用应用程序名称，则部署将仅更新先前部署的 CloudFormation 堆栈（而不是创建新的堆栈）。

   1. （可选）输入以下**LogLevel**设置之一以执行应用程序的 Lambda 函数：
      + `DEBUG`
      + `ERROR`
      + `INFO`（默认值）
      + `WARNING`

   1. （可选 **ReplayQueueRetentionPeriodInSeconds**）输入 Amazon SQS 重播队列保留消息的时间长度（以秒为单位）。如果您不输入值，则使用 1209600 秒（14 天）。

   1. 对于 **TopicArn**，输入要订阅此分叉管道实例的 Amazon SNS 主题的 ARN。

   1. 对于 **DestinationQueueName**，输入 Lambda 重播函数向其转发消息的 Amazon SQS 队列的名称。

   1. （可选）对于 **SubscriptionFilterPolicy**，输入 JSON 格式的 Amazon SNS 订阅筛选策略，用于筛选传入的事件。筛选策略决定缓冲哪些事件以进行重播。如果您不输入值，则不使用筛选（缓冲所有事件以进行重播）。

   1. 选择 **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications (我确认此应用程序创建自定义 IAM 角色和资源策略并部署嵌套应用程序)**，然后选择 **Deploy (部署)**。

在 “**部署状态 *my-app-replay***” 页面上，Lambda 会显示 “**您的应用程序正在部署中**” 状态。

在**资源**部分中， CloudFormation 开始创建堆栈并显示每个资源的 **CREATE\$1IN\$1PROGRESS** 状态。该过程完成后， CloudFormation 将显示 “**创建\$1完成**” 状态。

部署完成后，Lambda 将显示 **Your application has been deployed (您的应用程序已部署完成)**状态。

将在由事件重播管道自动预置的 Amazon SQS 索引中缓冲发布到您的 Amazon SNS 主题的消息以进行重播。

**注意**  
默认情况下，禁用重播。要启用重播，请导航到 Lambda 控制台上的函数页面，展开 **Designer** 部分，选择 **SQS** 磁贴，然后在 **SQS** 部分中，选择 **Enabled（启用）**。