

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Snowflake Snowpipe、Amazon S3、Amazon SNS 和 Amazon Data Firehose 將資料串流擷取自動化至 Snowflake 資料庫
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose"></a>

*Bikash Chandra Rout，Amazon Web Services*

## 總結
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-summary"></a>

此模式說明如何在 Amazon Web Services (AWS) 雲端上使用 服務來處理持續的資料串流，並將其載入 Snowflake 資料庫。模式使用 Amazon Data Firehose 將資料交付至 Amazon Simple Storage Service (Amazon S3)、Amazon Simple Notification Service (Amazon SNS) 在收到新資料時傳送通知，以及 Snowflake Snowpipe 將資料載入 Snowflake 資料庫。

透過遵循此模式，您可以在幾秒鐘內持續產生可用於分析的資料、避免多個手動`COPY`命令，以及完全支援載入時的半結構化資料。

## 先決條件和限制
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-prereqs"></a>

**先決條件**
+ 作用中 AWS 帳戶。
+ 持續將資料傳送至 Firehose 交付串流的資料來源。
+ 從 Firehose 交付串流接收資料的現有 S3 儲存貯體。
+ 作用中的 Snowflake 帳戶。

**限制**
+ Snowflake Snowpipe 不會直接連線至 Firehose。

## Architecture
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-architecture"></a>

![\[Firehose 擷取的資料會前往 Amazon S3、Amazon SNS、Snowflake Snowpipe 和 Snowflake 資料庫。\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/images/pattern-img/0c6f473b-973f-4229-a12e-ef697ae9b299/images/0adee3fb-1b90-4f7d-b2d0-b3b958f62c75.png)


**技術堆疊**
+ Amazon Data Firehose
+ Amazon SNS
+ Amazon S3
+ Snowflake Snowpipe
+ Snowflake 資料庫

## 工具
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-tools"></a>
+ [Amazon Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) 是一項全受管服務，可將即時串流資料交付至目的地，例如 Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk，以及受支援的第三方服務供應商擁有的任何自訂 HTTP 端點或 HTTP 端點。
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html) 是網際網路的儲存體。
+ [Amazon Simple Notification Service (Amazon SNS)](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) 會協調和管理消息傳遞或傳送到訂閱端點或客戶端。
+ [Snowflake](https://www.snowflake.com/) – Snowflake 是以Software-as-a-Service(SaaS) 提供的分析資料倉儲。
+ [Snowflake Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro.html) – Snowpipe 會在 Snowflake 階段提供檔案時立即從檔案載入資料。

## 史詩
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-epics"></a>

### 設定 Snowflake Snowpipe
<a name="set-up-a-snowflake-snowpipe"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 在 Snowflake 中建立 CSV 檔案。 | 登入 Snowflake 並執行 `CREATE FILE FORMAT`命令，以使用指定的欄位分隔符號建立 CSV 檔案。如需此命令和其他 Snowflake 命令的詳細資訊，請參閱[其他資訊](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-additional)一節。 | 開發人員 | 
| 建立外部 Snowflake 階段。 | 執行 `CREATE STAGE`命令來建立參考您先前建立之 CSV 檔案的外部 Snowflake 階段。重要：您將需要 S3 儲存貯體的 URL、 AWS 存取金鑰和 AWS 私密存取金鑰。執行 `SHOW STAGES`命令以確認已建立 Snowflake 階段。 | 開發人員  | 
| 建立 Snowflake 目標資料表。 | 執行 `CREATE TABLE`命令來建立 Snowflake 資料表。 | 開發人員 | 
| 建立管道。 | 執行 `CREATE PIPE`命令；確定 `auto_ingest=true` 位於 命令中。執行 `SHOW PIPES`命令以確認已建立管道。複製並儲存`notification_channel`資料欄值。此值將用於設定 Amazon S3 事件通知。 | 開發人員 | 

### 設定 S3 儲存貯體
<a name="configure-the-s3-bucket"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立 S3 儲存貯體的 30 天生命週期政策。 | 登入 AWS 管理主控台 並開啟 Amazon S3 主控台。選擇包含來自 Firehose 資料的 S3 儲存貯體。然後選擇 S3 儲存貯體中的**管理**索引標籤，然後選擇**新增生命週期規則**。在**生命週期規則**對話方塊中輸入規則的名稱，並為儲存貯體設定 30 天的生命週期規則。如需此案例和其他案例的協助，請參閱[相關資源](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-resources)一節。 | 系統管理員、開發人員 | 
| 為 S3 儲存貯體建立 IAM 政策。 | 開啟 AWS Identity and Access Management (IAM) 主控台，然後選擇**政策**。選擇 **Create policy** (建立政策)，然後選擇 **JSON** 標籤。將政策從[其他資訊](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-additional)區段複製並貼到 JSON 欄位。此政策將授予 `PutObject`和 `DeleteObject`許可，以及 `GetObjectVersion`、 `GetObject`和 `ListBucket`許可。選擇**檢閱政策**，輸入政策名稱，然後選擇**建立政策**。 | 系統管理員、開發人員 | 
| 將政策指派給 IAM 角色。 | 開啟 IAM 主控台，選擇**角色**，然後選擇**建立角色**。選擇**另一個 AWS 帳戶**做為信任的實體。輸入您的 AWS 帳戶 ID，然後選擇**需要外部 ID**。輸入您稍後要變更的預留位置 ID。選擇**下一步**，並指派您先前建立的 IAM 政策。然後建立 IAM 角色。 | 系統管理員、開發人員 | 
| 複製 IAM 角色的 Amazon Resource Name (ARN)。 | 開啟 IAM 主控台，然後選擇**角色**。選擇您先前建立的 IAM 角色，然後複製並存放**角色 ARN**。 | 系統管理員、開發人員 | 

### 在 Snowflake 中設定儲存整合
<a name="set-up-a-storage-integration-in-snowflake"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 在 Snowflake 中建立儲存整合。 | 登入 Snowflake 並執行 `CREATE STORAGE INTEGRATION`命令。這將修改信任關係、授予對 Snowflake 的存取權，並提供 Snowflake 階段的外部 ID。 | 系統管理員、開發人員 | 
| 擷取 Snowflake 帳戶的 IAM 角色。 | 執行 `DESC INTEGRATION`命令來擷取 IAM 角色的 ARN。`<integration_ name>` 是您先前建立的 Snowflake 儲存整合的名稱。 | 系統管理員、開發人員 | 
| 記錄兩個資料欄值。 | 複製並儲存 `storage_aws_iam_user_arn`和 `storage_aws_external_id`資料欄的值。 | 系統管理員、開發人員 | 

### 允許 Snowflake Snowpipe 存取 S3 儲存貯體
<a name="allow-snowflake-snowpipe-to-access-the-s3-bucket"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 修改 IAM 角色政策。 | 開啟 IAM 主控台，然後選擇**角色**。選擇您先前建立的 IAM 角色，然後選擇**信任關係**索引標籤。選擇**編輯信任關係**。`snowflake_external_id` 將 取代為您先前複製`storage_aws_external_id`的值。`snowflake_user_arn` 將 取代為您先前複製`storage_aws_iam_user_arn`的值。然後選擇**更新信任政策**。 | 系統管理員、開發人員 | 

### 開啟並設定 S3 儲存貯體的 SNS 通知
<a name="turn-on-and-configure-sns-notifications-for-the-s3-bucket"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 開啟 S3 儲存貯體的事件通知。 | 開啟 Amazon S3 主控台並選擇您的儲存貯體。選擇**屬性**，然後在**進階設定** 下，選擇**事件**。選擇**新增通知**，然後輸入此事件的名稱。如果您未輸入名稱，則會使用全域唯一識別碼 (GUID)。 | 系統管理員、開發人員 | 
| 設定 S3 儲存貯體的 Amazon SNS 通知。 | 在**事件**下，選擇 **ObjectCreate （全部）**，然後在**傳送至**下拉式清單中選擇 **SQS 佇列**。在 **SNS** 清單中，選擇**新增 SQS 佇列 ARN**，然後貼上您先前複製`notification_channel`的值。然後選擇 **Save** (儲存)。 | 系統管理員、開發人員 | 
| 訂閱 Snowflake SQS 佇列至 SNS 主題。 | 訂閱 Snowflake SQS 佇列至您建立的 SNS 主題。如需此步驟的說明，請參閱[相關資源](#automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-resources)一節。 | 系統管理員、開發人員 | 

### 檢查 Snowflake 階段整合
<a name="check-the-snowflake-stage-integration"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 檢查並測試 Snowpipe。 | 登入 Snowflake 並開啟 Snowflake 階段。將檔案放入 S3 儲存貯體，並檢查 Snowflake 資料表是否載入它們。當新物件出現在 SAmazon S33 會將 SNS 通知傳送至 Snowpipe。 | 系統管理員、開發人員 | 

## 相關資源
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-resources"></a>
+ [管理儲存生命週期](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-lifecycle.html)
+ [訂閱 Snowflake SQS 佇列至 Amazon SNS 主題](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3.html#prerequisite-create-an-amazon-sns-topic-and-subscription)

## 其他資訊
<a name="automate-data-stream-ingestion-into-a-snowflake-database-by-using-snowflake-snowpipe-amazon-s3-amazon-sns-and-amazon-data-firehose-additional"></a>

**建立檔案格式：**

```
CREATE FILE FORMAT <name>
TYPE = 'CSV'
FIELD_DELIMITER = '|'
SKIP_HEADER = 1;
```

**建立外部階段：**

```
externalStageParams (for Amazon S3) ::=
  URL = 's3://[//]'

  [ { STORAGE_INTEGRATION =  } | { CREDENTIALS = ( {  { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = ``  } ) ) }` ]
  [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] |
                   [ TYPE = 'AWS_SSE_S3' ] |
                   [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] |
                   [ TYPE = NONE ] )
```

**建立資料表：**

```
CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ]
  <table_name>
    ( <col_name> <col_type> [ { DEFAULT <expr>
                               | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ]
                                /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */
                            [ inlineConstraint ]
      [ , <col_name> <col_type> ... ]
      [ , outoflineConstraint ]
      [ , ... ] )
  [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ]
  [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>'
                           | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
  [ STAGE_COPY_OPTIONS = ( copyOptions ) ]
  [ DATA_RETENTION_TIME_IN_DAYS = <num> ]
  [ COPY GRANTS ]
  [ COMMENT = '<string_literal>' ]
```

**顯示階段：**

```
SHOW STAGES;
```

**建立管道：**

```
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] 
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ AWS_SNS_TOPIC =  ]
  [ INTEGRATION = '' ]
  [ COMMENT = '' ]
  AS
```

**顯示管道：**

```
SHOW PIPES [ LIKE '<pattern>' ]           
           [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]
```

**建立儲存整合：**

```
CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
```

範例：

```
create storage integration s3_int
  type = external_stage
  storage_provider = s3
  enabled = true
  storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole'
  storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/')
  storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');
```

如需此步驟的詳細資訊，請參閱[設定 Snowflake 儲存整合以從 Snowflake 文件存取 Amazon S3](https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration.html)。

**描述整合：**

```
DESC INTEGRATION <integration_name>;
```

**S3 儲存貯體政策：**

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "s3:PutObject",
              "s3:GetObject",
              "s3:GetObjectVersion",
              "s3:DeleteObject",
              "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3::://*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "/*"
                    ]
                }
            }
        }
    ]
}
```