

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Snowflake Snowpipe、Amazon S3、Amazon SNS 和 Amazon Data Firehose 自动将数据流摄取至 Snowflake 数据库
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose"></a>

*Bikash Chandra Rout，Amazon Web Services*

## Summary
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-summary"></a>

此模式描述了如何使用 Amazon Web Services（AWS）云上的服务处理连续数据流，并将其加载至 Snowflake 数据库。此模式使用 Amazon Data Firehose 将数据传输至 Amazon Simple Storage Service（Amazon S3），使用 Amazon Simple Notification Service（Amazon SNS）在收到新数据时发送通知，使用 Snowflake Snowpipe 将数据加载至 Snowflake 数据库。

通过遵循此模式，您可以在几秒钟内持续生成可供分析的数据，避免使用多个手动 `COPY` 命令，并且完全支持加载时的半结构化数据。

## 先决条件和限制
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-prereqs"></a>

**先决条件**
+ 活跃 AWS 账户的.
+ 持续将数据发送至 Firehose 传输流的数据来源。
+ 从 Kinesis 传输流接收数据的现有 S3 存储桶。
+ 一个活跃的 Snowflake 账户。

**限制**
+ Snowflake Snowpipe 无法直接连接至 Firehose。

## 架构
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-architecture"></a>

![\[Firehose 摄取的数据将进入 Amazon S3、Amazon SNS、Snowflake Snowpipe 和 Snowflake 数据库。\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/images/pattern-img/0c6f473b-973f-4229-a12e-ef697ae9b299/images/0adee3fb-1b90-4f7d-b2d0-b3b958f62c75.png)


**技术堆栈**
+ Amazon Data Firehose
+ Amazon SNS
+ Amazon S3
+ Snowflake Snowpipe
+ Snowflake 数据库

## 工具
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-tools"></a>
+ [Amazon Data Firehose 是一项完全托管](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html)的服务，用于向亚马逊 S3、亚马逊 Redshift、 OpenSearch 亚马逊服务、Splunk 等目的地以及受支持的第三方服务提供商拥有的任何自定义 HTTP 终端节点或 HTTP 终端节点提供实时流数据。
+ [Amazon Simple Storage Service（Amazon S3）](https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html)是一种面向互联网的存储服务。
+ [Amazon Simple Notification Service (Amazon SNS)](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) 可协调和管理向订阅端点或客户端传送或发送消息的过程。
+ [Snowflake](https://www.snowflake.com/) — Snowflake 是一个以（SaaS）形式提供的分析数据仓库。 Software-as-a-Service
+ [Snowflake Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro.html) – 在 Snowflake 阶段，一旦文件可用，Snowpipe 将立即加载文件中的数据。

## 操作说明
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-epics"></a>

### 设置 Snowflake Snowpipe
<a name="set-up-a-snowflake-snowpipe"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 在 Snowflake 中创建 CSV 格式文件。 | 登录 Snowflake 并运行 `CREATE FILE FORMAT` 命令，以创建具有指定字段分隔符的 CSV 文件。有关此命令和其他 Snowflake 命令的更多信息，请参阅[其他信息](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-additional)部分。 | 开发者版 | 
| 创建外部 Snowflake 阶段。 | 运行 `CREATE STAGE` 命令，以创建一个引用您之前创建的 CSV 文件的外部 Snowflake 阶段。重要：您将需要 S3 存储桶的 URL、 AWS 访问密钥和私有访问 AWS 密钥。运行 `SHOW STAGES` 命令，以验证 Snowflake 阶段是否已创建。 | 开发者版  | 
| 创建 Snowflake 目标表。 | 运行 `CREATE TABLE` 命令，以创建 Snowflake 表。 | 开发者版 | 
| 创建管道。 | 运行 `CREATE PIPE` 命令；确保命令中包含 `auto_ingest=true`。运行 `SHOW PIPES` 命令，以验证管道是否已创建。复制并保存 `notification_channel` 列的值。此值可用于配置 Amazon S3 事件通知。 | 开发者版 | 

### 配置 S3 存储桶
<a name="configure-the-s3-bucket"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 为 S3 存储桶创建 30 天的生命周期策略。 | 登录 AWS 管理控制台 并打开 Amazon S3 控制台。选择包含来自 Firehose 数据的 S3 存储桶。然后在 S3 存储桶中选择**管理**选项卡，再选择**添加生命周期规则**。在**生命周期规则**对话框内输入规则名称，并为存储桶配置 30 天生命周期规则。要获取有关此操作和其他操作的帮助，请参阅[相关资源](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-resources)部分。 | 系统管理员、开发人员 | 
| 为 S3 存储桶创建 IAM 策略。 | 打开 AWS Identity and Access Management (IAM) 控制台并选择**策略**。选择**创建策略**，然后选择 **JSON** 选项卡。将策略从[其他信息](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-additional)部分复制并粘贴至 JSON 字段。此策略将授予 `PutObject` 和 `DeleteObject` 权限，以及`GetObject`、`GetObjectVersion` 和 `ListBucket` 权限。选择**查看策略**，输入策略名称，然后选择**创建策略**。 | 系统管理员、开发人员 | 
| 将该策略分配至 IAM 角色。 | 打开 IAM 控制台，选择**角色**，然后选择**创建角色**。选择**其他 AWS 账户**作为可信实体。输入您的 AWS 账户 ID，然后选择 “**需要外部身份证**”。输入占位符 ID，稍后将对其进行更改。选择**下一步**，并分配您之前创建的 IAM 策略。然后创建 IAM 角色。 | 系统管理员、开发人员 | 
| 复制 IAM 角色的 Amazon 资源名称（ARN）。 | 打开 IAM 控制台，选择**角色**。选择您此前创建的 IAM 角色，然后复制并存储**角色 ARN**。 | 系统管理员、开发人员 | 

### 在 Snowflake 中设置存储集成
<a name="set-up-a-storage-integration-in-snowflake"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 在 Snowflake 创建存储集成。 | 登录 Snowflake，并运行 `CREATE STORAGE INTEGRATION` 命令。这将修改信任关系，授予 Snowflake 访问权限，并为您的 Snowflake 阶段提供外部 ID。 | 系统管理员、开发人员 | 
| 为您的 Snowflake 账户检索 IAM 角色。 | 运行 `DESC INTEGRATION` 命令，以检索 IAM 角色的 ARN。`<integration_ name>` 是您之前创建的 Snowflake 存储集成的名称。 | 系统管理员、开发人员 | 
| 记录两列的值。 | 复制并保存 `storage_aws_iam_user_arn` 和 `storage_aws_external_id` 列的值。 | 系统管理员、开发人员 | 

### 允许 Snowflake Snowpipe 访问 S3 存储桶
<a name="allow-snowflake-snowpipe-to-access-the-s3-bucket"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 修改 IAM 角色策略。 | 打开 IAM 控制台，选择**角色**。选择您此前创建的 IAM 角色，然后选择**信任关系**选项卡。选择**编辑信任关系**。将 `snowflake_external_id` 替换为您之前复制的 `storage_aws_external_id` 值。将 `snowflake_user_arn` 替换为您之前复制的 `storage_aws_iam_user_arn` 值。然后选择**更新信任策略**。 | 系统管理员、开发人员 | 

### 为 S3 存储桶开启并配置 SNS 通知
<a name="turn-on-and-configure-sns-notifications-for-the-s3-bucket"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 打开 S3 存储桶事件通知。 | 打开 Amazon S3 控制台并选择存储桶。选择**属性**，然后在**高级设置**下选择**事件**。选择**添加通知**，然后输入此事件名称。如果未输入名称，则使用全局唯一标识符 (GUID)。 | 系统管理员、开发人员 | 
| 为 S3 存储桶配置 Amazon SNS 通知。 | 在 “**事件**” 下，选择 **ObjectCreate （全部）**，然后在 “**发送至**” 下拉列表中选择 **SQS 队列**。在 **SNS** 列表中，选择**添加 SQS 队列 ARN**，然后粘贴之前复制的 `notification_channel` 值。然后选择**保存**。 | 系统管理员、开发人员 | 
| 为 Snowflake SQS 队列订阅 SNS 主题。 | 为 Snowflake SQS 队列订阅您创建的 SNS 主题。有关此步骤的帮助，请参阅[相关资源](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-resources)部分。 | 系统管理员、开发人员 | 

### 查看 Snowflake 阶段集成
<a name="check-the-snowflake-stage-integration"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 检查并测试 Snowpipe。 | 登录 Snowflake 并打开 Snowflake 阶段。将文件拖放至 S3 存储桶，然后检查 Snowflake 表是否已加载这些文件。当 S3 存储桶中显示新对象时，Amazon S3 将向 Snowpipe 发送 SNS 通知。 | 系统管理员、开发人员 | 

## 相关资源
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-resources"></a>
+ [管理存储生命周期](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-lifecycle.html)
+ [为 Snowflake SQS 队列订阅 Amazon SNS 主题。](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3.html#prerequisite-create-an-amazon-sns-topic-and-subscription)

## 附加信息
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-additional"></a>

**创建文件格式：**

```
CREATE FILE FORMAT <name>
TYPE = 'CSV'
FIELD_DELIMITER = '|'
SKIP_HEADER = 1;
```

**创建外部阶段：**

```
externalStageParams (for Amazon S3) ::=
  URL = 's3://[//]'

  [ { STORAGE_INTEGRATION =  } | { CREDENTIALS = ( {  { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = ``  } ) ) }` ]
  [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] |
                   [ TYPE = 'AWS_SSE_S3' ] |
                   [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] |
                   [ TYPE = NONE ] )
```

**创建表：**

```
CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ]
  <table_name>
    ( <col_name> <col_type> [ { DEFAULT <expr>
                               | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ]
                                /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */
                            [ inlineConstraint ]
      [ , <col_name> <col_type> ... ]
      [ , outoflineConstraint ]
      [ , ... ] )
  [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ]
  [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>'
                           | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
  [ STAGE_COPY_OPTIONS = ( copyOptions ) ]
  [ DATA_RETENTION_TIME_IN_DAYS = <num> ]
  [ COPY GRANTS ]
  [ COMMENT = '<string_literal>' ]
```

**显示阶段：**

```
SHOW STAGES;
```

**创建管道：**

```
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] 
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ AWS_SNS_TOPIC =  ]
  [ INTEGRATION = '' ]
  [ COMMENT = '' ]
  AS
```

**显示管道：**

```
SHOW PIPES [ LIKE '<pattern>' ]           
           [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]
```

**创建存储集成：**

```
CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
```

示例：

```
create storage integration s3_int
  type = external_stage
  storage_provider = s3
  enabled = true
  storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole'
  storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/')
  storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');
```

有关此步骤的更多信息，请参阅 Snowflake 文档中的[配置 Snowflake 存储集成以访问 Amazon S3](https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration.html)。

**描述集成：**

```
DESC INTEGRATION <integration_name>;
```

**S3 存储桶策略：**

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "s3:PutObject",
              "s3:GetObject",
              "s3:GetObjectVersion",
              "s3:DeleteObject",
              "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3::://*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "/*"
                    ]
                }
            }
        }
    ]
}
```