

# Fanout Amazon SNS events to AWS Event Fork Pipelines
<a name="sns-fork-pipeline-as-subscriber"></a>


|  | 
| --- |
| For event archiving and analytics, Amazon SNS now recommends using its native integration with Amazon Data Firehose. You can subscribe Firehose delivery streams to SNS topics, which allows you to send notifications to archiving and analytics endpoints such as Amazon Simple Storage Service (Amazon S3) buckets, Amazon Redshift tables, Amazon OpenSearch Service (OpenSearch Service), and more. Using Amazon SNS with Firehose delivery streams is a fully-managed and codeless solution that doesn't require you to use AWS Lambda functions. For more information, see [Fanout to Firehose delivery streams](sns-firehose-as-subscriber.md). | 

You can use Amazon SNS to build event-driven applications which use subscriber services to perform work automatically in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable. However, it can be labor-intensive to fork the processing of events into pipelines that address common event handling requirements, such as event storage, backup, search, analytics, and replay.

To accelerate the development of your event-driven applications, you can subscribe event-handling pipelines—powered by AWS Event Fork Pipelines—to Amazon SNS topics. AWS Event Fork Pipelines is a suite of open-source [nested applications](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template-nested-applications.html), based on the [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (AWS SAM), which you can deploy directly from the [AWS Event Fork Pipelines suite](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines) (choose **Show apps that create custom IAM roles or resource policies**) into your AWS account.

For an AWS Event Fork Pipelines use case, see [Deploying and testing the Amazon SNS event fork pipelines sample application](sns-deploy-test-fork-pipelines-sample-application.md).

**Topics**
+ [How AWS Event Fork Pipelines works](#how-sns-fork-works)
+ [Deploying AWS Event Fork Pipelines](#deploying-sns-fork-pipelines)
+ [Deploying and testing the Amazon SNS event fork pipelines sample application](sns-deploy-test-fork-pipelines-sample-application.md)
+ [Subscribing AWS Event Fork Pipelines to an Amazon SNS topic](sns-subscribe-event-fork-pipelines.md)

## How AWS Event Fork Pipelines works
<a name="how-sns-fork-works"></a>

AWS Event Fork Pipelines is a serverless design pattern. However, it is also a suite of nested serverless applications based on AWS SAM (which you can deploy directly from the AWS Serverless Application Repository (AWS SAR) to your AWS account in order to enrich your event-driven platforms). You can deploy these nested applications individually, as your architecture requires.

**Topics**
+ [The event storage and backup pipeline](#sns-fork-event-storage-and-backup-pipeline)
+ [The event search and analytics pipeline](#sns-fork-event-search-and-analytics-pipeline)
+ [The event replay pipeline](#sns-fork-event-replay-pipeline)

The following diagram shows an AWS Event Fork Pipelines application supplemented by three nested applications. You can deploy any of the pipelines from the AWS Event Fork Pipelines suite on the AWS SAR independently, as your architecture requires.

![\[The the AWS Event Fork Pipelines architecture, showing how events from an Amazon SNS topic are filtered and processed through three distinct pipelines: Event Storage and Backup, Event Search and Analytics, and Event Replay. These pipelines are depicted as vertically stacked boxes, each independently processing events in parallel from the same Amazon SNS topic.\]](http://docs.aws.amazon.com/sns/latest/dg/images/sns-fork-pipeline-as-subscriber-how-it-works.png)


Each pipeline is subscribed to the same Amazon SNS topic, allowing itself to process events in parallel as these events are published to the topic. Each pipeline is independent and can set its own [Subscription Filter Policy](sns-subscription-filter-policies.md). This allows a pipeline to process only a subset of the events that it is interested in (rather than all events published to the topic).

**Note**  
Because you place the three AWS Event Fork Pipelines alongside your regular event processing pipelines (possibly already subscribed to your Amazon SNS topic), you don’t need to change any portion of your current message publisher to take advantage of AWS Event Fork Pipelines in your existing workloads.

### The event storage and backup pipeline
<a name="sns-fork-event-storage-and-backup-pipeline"></a>

The following diagram shows the [Event Storage and Backup Pipeline](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-event-storage-backup-pipeline). You can subscribe this pipeline to your Amazon SNS topic to automatically back up the events flowing through your system.

This pipeline is comprised of an Amazon SQS queue that buffers the events delivered by the Amazon SNS topic, an AWS Lambda function that automatically polls for these events in the queue and pushes them into an stream, and an Amazon S3 bucket that durably backs up the events loaded by the stream. 

![\[The Fork-Event-Storage-Backup-Pipeline, which is designed to process and back up events from an Amazon SNS topic. The flow starts with an Amazon SNS topic from which events are fanned out to an Amazon SQS queue. These filtered events are then processed by an Lambda function, which forwards them to an Data Firehose. The Firehose stream is responsible for buffering, transforming, and compressing the events before loading them into an Amazon S3 backup bucket. Finally, Amazon Athena can be used to query the stored data. The diagram uses a series of icons and arrows to illustrate the flow from one service to the next, clearly labeling each component of the pipeline.\]](http://docs.aws.amazon.com/sns/latest/dg/images/sns-fork-event-storage-and-backup-pipeline.png)


To fine-tune the behavior of your Firehose stream, you can configure it to buffer, transform, and compress your events prior to loading them into the bucket. As events are loaded, you can use Amazon Athena to query the bucket using standard SQL queries. You can also configure the pipeline to reuse an existing Amazon S3 bucket or create a new one.

### The event search and analytics pipeline
<a name="sns-fork-event-search-and-analytics-pipeline"></a>

The following diagram shows the [Event Search and Analytics Pipeline](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-event-search-analytics-pipeline). You can subscribe this pipeline to your Amazon SNS topic to index the events that flow through your system in a search domain and then run analytics on them.

This pipeline is comprised of an Amazon SQS queue that buffers the events delivered by the Amazon SNS topic, an AWS Lambda function that polls events from the queue and pushes them into an stream, an Amazon OpenSearch Service domain that indexes the events loaded by the Firehose stream, and an Amazon S3 bucket that stores the dead-letter events that can’t be indexed in the search domain.

![\[The Event Search and Analytics Pipeline within an AWS architecture. It starts on the left with the Amazon SNS topic receiving all events. These events are then funneled through a dashed line representing "fan out filtered events" into an Amazon SQS queue. From the queue, events are processed by an Lambda function which then forwards them to an Data Firehose stream. The Data Firehose directs the events into two destinations: one route leads to an Amazon Elasticsearch Service for indexing, and the other route sends unprocessable or "dead-letter" events to an Amazon S3 dead-letter bucket. On the far right, the output from the Elasticsearch Service feeds into a Kibana dashboard for analytics and visualization. The entire flow is laid out horizontally and each component is connected by lines showing the direction of data flow.\]](http://docs.aws.amazon.com/sns/latest/dg/images/sns-fork-event-search-and-analytics-pipeline.png)


To fine-tune your Firehose stream in terms of event buffering, transformation, and compression, you can configure this pipeline.

You can also configure whether the pipeline should reuse an existing OpenSearch domain in your AWS account or create a new one for you. As events are indexed in the search domain, you can use Kibana to run analytics on your events and update visual dashboards in real-time. 

### The event replay pipeline
<a name="sns-fork-event-replay-pipeline"></a>

The following diagram shows the [Event Replay Pipeline](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-event-replay-pipeline). To record the events that have been processed by your system for the past 14 days (for example when your platform needs to recover from failure), you can subscribe this pipeline to your Amazon SNS topic and then reprocess the events.

This pipeline is comprised of an Amazon SQS queue that buffers the events delivered by the Amazon SNS topic, and an AWS Lambda function that polls events from the queue and redrives them into your regular event processing pipeline, which is also subscribed to your topic.

![\[The Event Replay Pipeline in a flowchart format. From left to right, it begins with an Amazon SNS topic that distributes filtered events to two parallel processes. The upper flow represents your regular event processing pipeline, which includes an Amazon SQS queue that processes events. The lower flow, labeled as the "fork-event-replay-pipeline," includes an Amazon SQS replay queue where events are temporarily stored before being processed by a Lambda replay function. This Lambda function has the capability to re-drive events into your regular event processing pipeline or hold them for replay, based on whether the replay feature is enabled or disabled. The diagram also indicates that operators have control over enabling or disabling the event replay functionality.\]](http://docs.aws.amazon.com/sns/latest/dg/images/sns-fork-event-replay-pipeline.png)


**Note**  
By default, the replay function is disabled, not redriving your events. If you need to reprocess events, you must enable the Amazon SQS replay queue as an event source for the AWS Lambda replay function.

## Deploying AWS Event Fork Pipelines
<a name="deploying-sns-fork-pipelines"></a>

The [AWS Event Fork Pipelines suite](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines) (choose **Show apps that create custom IAM roles or resource policies**) is available as a group of public applications in the AWS Serverless Application Repository, from where you can deploy and test them manually using the [AWS Lambda console](https://console.aws.amazon.com/lambda/). For information about deploying pipelines using the AWS Lambda console, see [Subscribing AWS Event Fork Pipelines to an Amazon SNS topic](sns-subscribe-event-fork-pipelines.md).

In a production scenario, we recommend embedding AWS Event Fork Pipelines within your overall application's AWS SAM template. The nested-application feature lets you do this by adding the resource `[AWS::Serverless::Application](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template.html#serverless-sam-template-application)` to your AWS SAM template, referencing the AWS SAR `ApplicationId` and the `SemanticVersion` of the nested application.

For example, you can use the Event Storage and Backup Pipeline as a nested application by adding the following YAML snippet to the `Resources` section of your AWS SAM template.

```
Backup:   
    Type: AWS::Serverless::Application
  Properties:
    Location:
      ApplicationId: arn:aws:serverlessrepo:us-east-2:123456789012:applications/fork-event-storage-backup-pipeline
      SemanticVersion: 1.0.0
    Parameters: 
      #The ARN of the Amazon SNS topic whose messages should be backed up to the Amazon S3 bucket.
      TopicArn: !Ref MySNSTopic
```

When you specify parameter values, you can use AWS CloudFormation intrinsic functions to reference other resources in your template. For example, in the YAML snippet above, the `TopicArn` parameter references the `[AWS::SNS::Topic](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sns-topic.html)` resource `MySNSTopic`, defined elsewhere in the AWS SAM template. For more information, see the [Intrinsic Function Reference](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference.html) in the *AWS CloudFormation User Guide*.

**Note**  
The AWS Lambda console page for your AWS SAR application includes the **Copy as SAM Resource** button, which copies the YAML required for nesting an AWS SAR application to the clipboard.

# Deploying and testing the Amazon SNS event fork pipelines sample application
<a name="sns-deploy-test-fork-pipelines-sample-application"></a>

To accelerate the development of your event-driven applications, you can subscribe event-handling pipelines—powered by AWS Event Fork Pipelines—to Amazon SNS topics. AWS Event Fork Pipelines is a suite of open-source [nested applications](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template-nested-applications.html), based on the [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (AWS SAM), which you can deploy directly from the [AWS Event Fork Pipelines suite](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines) (choose **Show apps that create custom IAM roles or resource policies**) into your AWS account. For more information, see [How AWS Event Fork Pipelines works](sns-fork-pipeline-as-subscriber.md#how-sns-fork-works).

This page shows how you can use the AWS Management Console to deploy and test the AWS Event Fork Pipelines sample application.

**Important**  
To avoid incurring unwanted costs after you finish deploying the AWS Event Fork Pipelines sample application, delete its CloudFormation stack. For more information, see [Deleting a Stack on the CloudFormation Console](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html) in the *AWS CloudFormation User Guide*.

# AWS Event Fork Pipelines use case example
<a name="example-sns-fork-use-case"></a>

The following scenario describes an event-driven, serverless e-commerce application that uses AWS Event Fork Pipelines. You can use this [example e-commerce application](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:077246666028:applications~fork-example-ecommerce-checkout-api) in the AWS Serverless Application Repository and then deploy it in your AWS account using the AWS Lambda console, where you can test it and examine its source code in GitHub.

![\[The architecture of a serverless e-commerce application that integrates AWS services. It depicts the flow from ecommerce users placing orders via an API Gateway to different processing pipelines including order storage, search analytics, and replay, showcasing how events are managed and analyzed through Amazon SNS, Lambda, Amazon SQS, DynamoDB, and Kibana.\]](http://docs.aws.amazon.com/sns/latest/dg/images/sns-fork-example-use-case.png)


This e-commerce application takes orders from buyers through a RESTful API hosted by API Gateway and backed by the AWS Lambda function `CheckoutApiBackendFunction`. This function publishes all received orders to an Amazon SNS topic named `CheckoutEventsTopic` which, in turn, fans out the orders to four different pipelines.

The first pipeline is the regular checkout-processing pipeline designed and implemented by the owner of the e-commerce application. This pipeline has the Amazon SQS queue `CheckoutQueue` that buffers all received orders, an AWS Lambda function named `CheckoutFunction` that polls the queue to process these orders, and the DynamoDB table `CheckoutTable` that securely saves all placed orders.

## Applying AWS Event Fork Pipelines
<a name="applying-sns-fork-pipelines"></a>

The components of the e-commerce application handle the core business logic. However, the e-commerce application owner also needs to address the following:
+ **Compliance**—secure, compressed backups encrypted at rest and sanitization of sensitive information
+ **Resiliency**—replay of most recent orders in case of the disruption of the fulfillment process
+ **Searchability**—running analytics and generating metrics on placed orders

Instead of implementing this event processing logic, the application owner can subscribe AWS Event Fork Pipelines to the `CheckoutEventsTopic` Amazon SNS topic
+ [The event storage and backup pipeline](sns-fork-pipeline-as-subscriber.md#sns-fork-event-storage-and-backup-pipeline) is configured to transform data to remove credit card details, buffer data for 60 seconds, compress it using GZIP, and encrypt it using the default customer managed key for Amazon S3. This key is managed by AWS and powered by the AWS Key Management Service (AWS KMS).

  For more information, see [Choose Amazon S3 For Your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-s3), [Amazon Data Firehose Data Transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html), and [Configure Settings](https://docs.aws.amazon.com/firehose/latest/dev/create-configure.html) in the *Amazon Data Firehose Developer Guide*.
+ [The event search and analytics pipeline](sns-fork-pipeline-as-subscriber.md#sns-fork-event-search-and-analytics-pipeline) is configured with an index retry duration of 30 seconds, a bucket for storing orders that fail to be indexed in the search domain, and a filter policy to restrict the set of indexed orders.

  For more information, see [Choose OpenSearch Service for your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-elasticsearch) in the *Amazon Data Firehose Developer Guide*.
+ [The event replay pipeline](sns-fork-pipeline-as-subscriber.md#sns-fork-event-replay-pipeline) is configured with the Amazon SQS queue part of the regular order-processing pipeline designed and implemented by the e-commerce application owner.

  For more information, see [Queue Name and URL](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-general-identifiers.html#queue-name-url) in the *Amazon Simple Queue Service Developer Guide*.

The following JSON filter policy is set in the configuration for the Event Search and Analytics Pipeline. It matches only incoming orders in which the total amount is \$1100 or higher. For more information, see [Amazon SNS message filtering](sns-message-filtering.md).

```
{				
   "amount": [{ "numeric": [ ">=", 100 ] }]
}
```

Using the AWS Event Fork Pipelines pattern, the e-commerce application owner can avoid the development overhead that often follows coding undifferentiating logic for event handling. Instead, she can deploy AWS Event Fork Pipelines directly from the AWS Serverless Application Repository into her AWS account.

# Step 1: Deploying the sample Amazon SNS application
<a name="deploy-sample-application"></a>

1. Sign in to the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. On the navigation panel, choose **Functions** and then choose **Create function**.

1. On the **Create function** page, do the following:

   1. Choose **Browse serverless app repository**, **Public applications**, **Show apps that create custom IAM roles or resource policies**.

   1. Search for `fork-example-ecommerce-checkout-api` and then choose the application.

1. On the **fork-example-ecommerce-checkout-api** page, do the following:

   1. In the **Application settings** section, enter an **Application name** (for example, `fork-example-ecommerce-my-app`).
**Note**  
To find your resources easily later, keep the prefix `fork-example-ecommerce`.
For each deployment, the application name must be unique. If you reuse an application name, the deployment will update only the previously deployed CloudFormation stack (rather than create a new one).

   1. (Optional) Enter one of the following **LogLevel** settings for the execution of your application's Lambda function:
      + `DEBUG`
      + `ERROR`
      + `INFO` (default)
      + `WARNING`

1. Choose **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications.** and then, at the bottom of the page, choose **Deploy**.

On the **Deployment status for fork-example-ecommerce-*my-app*** page, Lambda displays the **Your application is being deployed** status.

In the **Resources** section, CloudFormation begins to create the stack and displays the **CREATE\$1IN\$1PROGRESS** status for each resource. When the process is complete, CloudFormation displays the **CREATE\$1COMPLETE** status.

**Note**  
It might take 20-30 minutes for all resources to be deployed.

When the deployment is complete, Lambda displays the **Your application has been deployed** status.

# Step 2: Executing the SNS-linked sample application
<a name="execute-sample-application"></a>

1. In the AWS Lambda console, on the navigation panel, choose **Applications**.

1. On the **Applications** page, in the search field, search for `serverlessrepo-fork-example-ecommerce-my-app` and then choose the application.

1. In the **Resources** section, do the following:

   1. To find the resource whose type is **ApiGateway RestApi**, sort the resources by **Type**, for example `ServerlessRestApi`, and then expand the resource.

   1. Two nested resources are displayed, of types **ApiGateway Deployment** and **ApiGateway Stage**.

   1. Copy the link **Prod API endpoint** and append `/checkout` to it, for example: 

      ```
      https://abcdefghij.execute-api.us-east-2.amazonaws.com/Prod/checkout
      ```

1. Copy the following JSON to a file named `test_event.json`.

   ```
   {
      "id": 15311,
      "date": "2019-03-25T23:41:11-08:00",
      "status": "confirmed",
      "customer": {
         "id": 65144,		
   	 "quantity": 2,
         "price": 25.00,
         "subtotal": 50.00
      }]
   }
   ```

1. To send an HTTPS request to your API endpoint, pass the sample event payload as input by executing a `curl` command, for example:

   ```
   curl -d "$(cat test_event.json)" https://abcdefghij.execute-api.us-east-2.amazonaws.com/Prod/checkout
   ```

   The API returns the following empty response, indicating a successful execution:

   ```
   { }
   ```

# Step 3: Verifying Amazon SNS application and pipeline performance
<a name="verify-sample-application-pipelines"></a>

## Step 1: Verifying the execution of the sample checkout pipeline
<a name="verify-execution-checkout-pipeline"></a>

1. Sign in to the [Amazon DynamoDB console](https://console.aws.amazon.com/dynamodb/).

1. On the navigation panel, choose **Tables**.

1. Search for `serverlessrepo-fork-example` and choose `CheckoutTable`.

1. On the table details page, choose **Items** and then choose the created item.

   The stored attributes are displayed.

## Step 2: Verifying the execution of the event storage and backup pipeline
<a name="verify-execution-event-storage-backup-pipeline"></a>

1. Sign in to the [Amazon S3 console](https://console.aws.amazon.com/s3/).

1. On the navigation panel, choose **Buckets**.

1. Search for `serverlessrepo-fork-example` and then choose `CheckoutBucket`.

1. Navigate the directory hierarchy until you find a file with the extension `.gz`.

1. To download the file, choose **Actions**, **Open**.

1. The pipeline is configured with a Lambda function that sanitizes credit card information for compliance reasons.

   To verify that the stored JSON payload doesn't contain any credit card information, decompress the file.

## Step 3: Verifying the execution of the event search and analytics pipeline
<a name="verify-execution-event-search-analytics-pipeline"></a>

1. Sign in to the [OpenSearch Service console](https://console.aws.amazon.com/aos/).

1. On the navigation panel, under **My domains**, choose the domain prefixed with `serverl-analyt`.

1. The pipeline is configured with an Amazon SNS subscription filter policy that sets a numeric matching condition.

   To verify that the event is indexed because it refers to an order whose value is higher than USD \$1100, on the **serverl-analyt-*abcdefgh1ijk*** page, choose **Indices**, **checkout\$1events**.

## Step 4: Verifying the execution of the event replay pipeline
<a name="verify-execution-event-replay-pipeline"></a>

1. Sign in to the [Amazon SQS console](https://console.aws.amazon.com/sqs/).

1. In the list of queues, search for `serverlessrepo-fork-example` and choose `ReplayQueue`.

1. Choose **Send and receive messages**.

1. In the **Send and receive messages in fork-example-ecommerce-*my-app*...ReplayP-ReplayQueue-*123ABCD4E5F6*** dialog box, choose **Poll for messages**. 

1. To verify that the event is enqueued, choose **More Details** next to the message that appears in the queue.

# Step 4: Simulating an issue and replay events for recovery
<a name="simulate-issue-replay-events-for-recovery"></a>

## Step 1: Enable the simulated issue and send a second API request
<a name="enable-simulated-issue-send-second-api-request"></a>

1. Sign in to the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. On the navigation panel, choose **Functions**.

1. Search for `serverlessrepo-fork-example` and choose `CheckoutFunction`.

1. On the **fork-example-ecommerce-*my-app*-CheckoutFunction-*ABCDEF*...** page, in the **Environment variables** section, set the **BUG\$1ENABLED** variable to **true** and then choose **Save**.

1. Copy the following JSON to a file named `test_event_2.json`.

   ```
   {
   	   "id": 9917,
   	   "date": "2019-03-26T21:11:10-08:00",
   	   "status": "confirmed",
   	   "customer": {
   	      "id": 56999,
   "quantity": 1,
   	      "price": 75.00,
   	      "subtotal": 75.00
   	   }]
   	}
   ```

1. To send an HTTPS request to your API endpoint, pass the sample event payload as input by executing a `curl` command, for example:

   ```
   curl -d "$(cat test_event_2.json)" https://abcdefghij.execute-api.us-east-2.amazonaws.com/Prod/checkout
   ```

   The API returns the following empty response, indicating a successful execution:

   ```
   { }
   ```

## Step 2: Verify simulated data corruption
<a name="verify-simulated-data-corruption"></a>

1. Sign in to the [Amazon DynamoDB console](https://console.aws.amazon.com/dynamodb/).

1. On the navigation panel, choose **Tables**.

1. Search for `serverlessrepo-fork-example` and choose `CheckoutTable`.

1. On the table details page, choose **Items** and then choose the created item.

   The stored attributes are displayed, some marked as **CORRUPTED\$1**

## Step 3: Disable the simulated issue
<a name="disable-simulated-issue"></a>

1. Sign in to the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. On the navigation panel, choose **Functions**.

1. Search for `serverlessrepo-fork-example` and choose `CheckoutFunction`.

1. On the **fork-example-ecommerce-*my-app*-CheckoutFunction-*ABCDEF*...** page, in the **Environment variables** section, set the **BUG\$1ENABLED** variable to **false** and then choose **Save**.

## Step 4: Enable replay to recover from the issue
<a name="enable-replay-recover-from-simulated-issue"></a>

1. In the AWS Lambda console, on the navigation panel, choose **Functions**.

1. Search for `serverlessrepo-fork-example` and choose `ReplayFunction`.

1. Expand the **Designer** section, choose the **SQS** tile and then, in the **SQS** section, choose **Enabled**.
**Note**  
It takes approximately 1 minute for the Amazon SQS event source trigger to become enabled.

1. Choose **Save**.

1. To view the recovered attributes, return to the Amazon DynamoDB console.

1. To disable replay, return to the AWS Lambda console and disable the Amazon SQS event source trigger for `ReplayFunction`.

# Subscribing AWS Event Fork Pipelines to an Amazon SNS topic
<a name="sns-subscribe-event-fork-pipelines"></a>

To accelerate the development of your event-driven applications, you can subscribe event-handling pipelines—powered by AWS Event Fork Pipelines—to Amazon SNS topics. AWS Event Fork Pipelines is a suite of open-source [nested applications](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-template-nested-applications.html), based on the [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (AWS SAM), which you can deploy directly from the [AWS Event Fork Pipelines suite](https://serverlessrepo.aws.amazon.com/applications?query=aws-event-fork-pipelines) (choose **Show apps that create custom IAM roles or resource policies**) into your AWS account. For more information, see [How AWS Event Fork Pipelines works](sns-fork-pipeline-as-subscriber.md#how-sns-fork-works).

This section show how you can use the AWS Management Console to deploy a pipeline and then subscribe AWS Event Fork Pipelines to an Amazon SNS topic. Before you begin, [create an Amazon SNS topic](sns-create-topic.md).

To delete the resources that comprise a pipeline, find the pipeline on the **Applications** page of on the AWS Lambda console, expand the **SAM template section**, choose **CloudFormation stack**, and then choose **Other Actions**, **Delete Stack**.

# Deploying and subscribing the Event Storage and Backup Pipeline to Amazon SNS
<a name="deploy-event-storage-backup-pipeline"></a>


|  | 
| --- |
| For event archiving and analytics, Amazon SNS now recommends using its native integration with Amazon Data Firehose. You can subscribe Firehose delivery streams to SNS topics, which allows you to send notifications to archiving and analytics endpoints such as Amazon Simple Storage Service (Amazon S3) buckets, Amazon Redshift tables, Amazon OpenSearch Service (OpenSearch Service), and more. Using Amazon SNS with Firehose delivery streams is a fully-managed and codeless solution that doesn't require you to use AWS Lambda functions. For more information, see [Fanout to Firehose delivery streams](sns-firehose-as-subscriber.md). | 

This page shows how to deploy the [Event Storage and Backup Pipeline](sns-fork-pipeline-as-subscriber.md#sns-fork-event-storage-and-backup-pipeline) and subscribe it to an Amazon SNS topic. This process automatically turns the AWS SAM template associated with the pipeline into an CloudFormation stack, and then deploys the stack into your AWS account. This process also creates and configures the set of resources that comprise the Event Storage and Backup Pipeline, including the following:
+ Amazon SQS queue
+ Lambda function
+ Firehose delivery stream
+ Amazon S3 backup bucket

For more information about configuring a stream with an Amazon S3 bucket as a destination, see `[S3DestinationConfiguration](https://docs.aws.amazon.com/firehose/latest/APIReference/API_S3DestinationConfiguration.html)` in the *Amazon Data Firehose API Reference*.

For more information about transforming events and about configuring event buffering, event compression, and event encryption, see [Creating an Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) in the *Amazon Data Firehose Developer Guide*.

For more information about filtering events, see [Amazon SNS subscription filter policies](sns-subscription-filter-policies.md) in this guide.

1. Sign in to the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. On the navigation panel, choose **Functions** and then choose **Create function**.

1. On the **Create function** page, do the following:

   1. Choose **Browse serverless app repository**, **Public applications**, **Show apps that create custom IAM roles or resource policies**.

   1. Search for `fork-event-storage-backup-pipeline` and then choose the application.

1. On the **fork-event-storage-backup-pipeline** page, do the following:

   1. In the **Application settings** section, enter an **Application name** (for example, `my-app-backup`).
**Note**  
For each deployment, the application name must be unique. If you reuse an application name, the deployment will update only the previously deployed CloudFormation stack (rather than create a new one).

   1. (Optional) For **BucketArn**, enter the ARN of the Amazon S3 bucket into which incoming events are loaded. If you don't enter a value, a new Amazon S3 bucket is created in your AWS account.

   1. (Optional) For **DataTransformationFunctionArn**, enter the ARN of the Lambda function through which the incoming events are transformed. If you don't enter a value, data transformation is disabled.

   1. (Optional) Enter one of the following **LogLevel** settings for the execution of your application's Lambda function:
      + `DEBUG`
      + `ERROR`
      + `INFO` (default)
      + `WARNING`

   1. For **TopicArn**, enter the ARN of the Amazon SNS topic to which this instance of the fork pipeline is to be subscribed.

   1. (Optional) For **StreamBufferingIntervalInSeconds** and **StreamBufferingSizeInMBs**, enter the values for configuring the buffering of incoming events. If you don't enter any values, 300 seconds and 5 MB are used.

   1. (Optional) Enter one of the following **StreamCompressionFormat** settings for compressing incoming events:
      + `GZIP`
      + `SNAPPY`
      + `UNCOMPRESSED` (default)
      + `ZIP`

   1. (Optional) For **StreamPrefix**, enter the string prefix to name files stored in the Amazon S3 backup bucket. If you don't enter a value, no prefix is used.

   1. (Optional) For **SubscriptionFilterPolicy**, enter the Amazon SNS subscription filter policy, in JSON format, to be used for filtering incoming events. The filter policy decides which events are indexed in the OpenSearch Service index. If you don't enter a value, no filtering is used (all events are indexed).

   1. (Optional) For **SubscriptionFilterPolicyScope**, enter the string `MessageBody` or `MessageAttributes` to enable payload-based or attribute-based message filtering. 

   1. Choose **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications.** and then choose **Deploy**.

On the **Deployment status for *my-app*** page, Lambda displays the **Your application is being deployed** status.

In the **Resources** section, CloudFormation begins to create the stack and displays the **CREATE\$1IN\$1PROGRESS** status for each resource. When the process is complete, CloudFormation displays the **CREATE\$1COMPLETE** status.

When the deployment is complete, Lambda displays the **Your application has been deployed** status.

Messages published to your Amazon SNS topic are stored in the Amazon S3 backup bucket provisioned by the Event Storage and Backup pipeline automatically.

# Deploying and subscribing the Event Search and Analytics Pipeline to Amazon SNS
<a name="deploy-event-search-analytics-pipeline"></a>


|  | 
| --- |
| For event archiving and analytics, Amazon SNS now recommends using its native integration with Amazon Data Firehose. You can subscribe Firehose delivery streams to SNS topics, which allows you to send notifications to archiving and analytics endpoints such as Amazon Simple Storage Service (Amazon S3) buckets, Amazon Redshift tables, Amazon OpenSearch Service (OpenSearch Service), and more. Using Amazon SNS with Firehose delivery streams is a fully-managed and codeless solution that doesn't require you to use AWS Lambda functions. For more information, see [Fanout to Firehose delivery streams](sns-firehose-as-subscriber.md). | 

This page shows how to deploy the [Event Search and Analytics Pipeline](sns-fork-pipeline-as-subscriber.md#sns-fork-event-search-and-analytics-pipeline) and subscribe it to an Amazon SNS topic. This process automatically turns the AWS SAM template associated with the pipeline into an CloudFormation stack, and then deploys the stack into your AWS account. This process also creates and configures the set of resources that comprise the Event Search and Analytics Pipeline, including the following:
+ Amazon SQS queue
+ Lambda function
+ Firehose delivery stream
+ Amazon OpenSearch Service domain
+ Amazon S3 dead-letter bucket

For more information about configuring a stream with an index as a destination, see `[ElasticsearchDestinationConfiguration](https://docs.aws.amazon.com/firehose/latest/APIReference/API_ElasticsearchDestinationConfiguration.html)` in the *Amazon Data Firehose API Reference*.

For more information about transforming events and about configuring event buffering, event compression, and event encryption, see [Creating an Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) in the *Amazon Data Firehose Developer Guide*.

For more information about filtering events, see [Amazon SNS subscription filter policies](sns-subscription-filter-policies.md) in this guide.

1. Sign in to the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. On the navigation panel, choose **Functions** and then choose **Create function**.

1. On the **Create function** page, do the following:

   1. Choose **Browse serverless app repository**, **Public applications**, **Show apps that create custom IAM roles or resource policies**.

   1. Search for `fork-event-search-analytics-pipeline` and then choose the application.

1. On the **fork-event-search-analytics-pipeline** page, do the following:

   1. In the **Application settings** section, enter an **Application name** (for example, `my-app-search`).
**Note**  
For each deployment, the application name must be unique. If you reuse an application name, the deployment will update only the previously deployed CloudFormation stack (rather than create a new one).

   1. (Optional) For **DataTransformationFunctionArn**, enter the ARN of the Lambda function used for transforming incoming events. If you don't enter a value, data transformation is disabled.

   1. (Optional) Enter one of the following **LogLevel** settings for the execution of your application's Lambda function:
      + `DEBUG`
      + `ERROR`
      + `INFO` (default)
      + `WARNING`

   1. (Optional) For **SearchDomainArn**, enter the ARN of the OpenSearch Service domain, a cluster that configures the needed compute and storage functionality. If you don't enter a value, a new domain is created with the default configuration.

   1. For **TopicArn**, enter the ARN of the Amazon SNS topic to which this instance of the fork pipeline is to be subscribed.

   1. For **SearchIndexName**, enter the name of the OpenSearch Service index for event search and analytics.
**Note**  
The following quotas apply to index names:  
Can't include uppercase letters
Can't include the following characters: `\ / * ? " < > | ` , #`
Can't begin with the following characters: `- + _`
Can't be the following: `. ..`
Can't be longer than 80 characters
Can't be longer than 255 bytes
Can't contain a colon (from OpenSearch Service 7.0)

   1. (Optional) Enter one of the following **SearchIndexRotationPeriod** settings for the rotation period of the OpenSearch Service index:
      + `NoRotation` (default)
      + `OneDay`
      + `OneHour`
      + `OneMonth`
      + `OneWeek`

      Index rotation appends a timestamp to the index name, facilitating the expiration of old data. 

   1. For **SearchTypeName**, enter the name of the OpenSearch Service type for organizing the events in an index.
**Note**  
OpenSearch Service type names can contain any character (except null bytes) but can't begin with `_`.
For OpenSearch Service 6.x, there can be only one type per index. If you specify a new type for an existing index that already has another type, Firehose returns a runtime error.

   1. (Optional) For **StreamBufferingIntervalInSeconds** and **StreamBufferingSizeInMBs**, enter the values for configuring the buffering of incoming events. If you don't enter any values, 300 seconds and 5 MB are used.

   1. (Optional) Enter one of the following **StreamCompressionFormat** settings for compressing incoming events:
      + `GZIP`
      + `SNAPPY`
      + `UNCOMPRESSED` (default)
      + `ZIP`

   1. (Optional) For **StreamPrefix**, enter the string prefix to name files stored in the Amazon S3 dead-letter bucket. If you don't enter a value, no prefix is used.

   1. (Optional) For **StreamRetryDurationInSecons**, enter the retry duration for cases when Firehose can't index events in the OpenSearch Service index. If you don't enter a value, then 300 seconds is used.

   1. (Optional) For **SubscriptionFilterPolicy**, enter the Amazon SNS subscription filter policy, in JSON format, to be used for filtering incoming events. The filter policy decides which events are indexed in the OpenSearch Service index. If you don't enter a value, no filtering is used (all events are indexed).

   1. Choose **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications.** and then choose **Deploy**.

On the **Deployment status for *my-app-search*** page, Lambda displays the **Your application is being deployed** status.

In the **Resources** section, CloudFormation begins to create the stack and displays the **CREATE\$1IN\$1PROGRESS** status for each resource. When the process is complete, CloudFormation displays the **CREATE\$1COMPLETE** status.

When the deployment is complete, Lambda displays the **Your application has been deployed** status.

Messages published to your Amazon SNS topic are indexed in the OpenSearch Service index provisioned by the Event Search and Analytics pipeline automatically. If the pipeline can't index an event, it stores it in a Amazon S3 dead-letter bucket.

# Deploying the Event Replay Pipeline with Amazon SNS integration
<a name="deploy-event-replay-pipeline"></a>

This page shows how to deploy the [Event Replay Pipeline](sns-fork-pipeline-as-subscriber.md#sns-fork-event-replay-pipeline) and subscribe it to an Amazon SNS topic. This process automatically turns the AWS SAM template associated with the pipeline into an CloudFormation stack, and then deploys the stack into your AWS account. This process also creates and configures the set of resources that comprise the Event Replay Pipeline, including an Amazon SQS queue and a Lambda function.

For more information about filtering events, see [Amazon SNS subscription filter policies](sns-subscription-filter-policies.md) in this guide.

1. Sign in to the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. On the navigation panel, choose **Functions** and then choose **Create function**.

1. On the **Create function** page, do the following:

   1. Choose **Browse serverless app repository**, **Public applications**, **Show apps that create custom IAM roles or resource policies**.

   1. Search for `fork-event-replay-pipeline` and then choose the application.

1. On the **fork-event-replay-pipeline** page, do the following:

   1. In the **Application settings** section, enter an **Application name** (for example, `my-app-replay`).
**Note**  
For each deployment, the application name must be unique. If you reuse an application name, the deployment will update only the previously deployed CloudFormation stack (rather than create a new one).

   1. (Optional) Enter one of the following **LogLevel** settings for the execution of your application's Lambda function:
      + `DEBUG`
      + `ERROR`
      + `INFO` (default)
      + `WARNING`

   1. (Optional) For **ReplayQueueRetentionPeriodInSeconds**, enter the amount of time, in seconds, for which the Amazon SQS replay queue keeps the message. If you don't enter a value, 1,209,600 seconds (14 days) is used.

   1. For **TopicArn**, enter the ARN of the Amazon SNS topic to which this instance of the fork pipeline is to be subscribed.

   1. For **DestinationQueueName**, enter the name of the Amazon SQS queue to which the Lambda replay function forwards messages.

   1. (Optional) For **SubscriptionFilterPolicy**, enter the Amazon SNS subscription filter policy, in JSON format, to be used for filtering incoming events. The filter policy decides which events are buffered for replay. If you don't enter a value, no filtering is used (all events are buffered for replay).

   1. Choose **I acknowledge that this app creates custom IAM roles, resource policies and deploys nested applications.** and then choose **Deploy**.

On the **Deployment status for *my-app-replay*** page, Lambda displays the **Your application is being deployed** status.

In the **Resources** section, CloudFormation begins to create the stack and displays the **CREATE\$1IN\$1PROGRESS** status for each resource. When the process is complete, CloudFormation displays the **CREATE\$1COMPLETE** status.

When the deployment is complete, Lambda displays the **Your application has been deployed** status.

Messages published to your Amazon SNS topic are buffered for replay in the Amazon SQS queue provisioned by the Event Replay Pipeline automatically.

**Note**  
By default, replay is disabled. To enable replay, navigate to the function's page on the Lambda console, expand the **Designer** section, choose the **SQS** tile and then, in the **SQS** section, choose **Enabled**.