

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 建置資料管道，以使用 AWS DataOps 開發套件擷取、轉換和分析 Google Analytics 資料
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit"></a>

*Anton Kukushkin 和 Rudy Puig，Amazon Web Services*

## 總結
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-summary"></a>

此模式說明如何使用 AWS DataOps 開發套件 (AWS DDK) 和其他 建置資料管道來擷取、轉換和分析 Google Analytics 資料 AWS 服務。 AWS DDK 是一種開放原始碼開發架構，可協助您建置資料工作流程和現代資料架構 AWS。DDK AWS 的主要目標之一是節省您通常投入大量人力的資料管道任務的時間和精力，例如協調管道、建置基礎設施，以及在該基礎設施背後建立 DevOps。您可以將這些人力密集型任務卸載至 AWS DDK，以便專注於編寫程式碼和其他高價值活動。

## 先決條件和限制
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-prereqs"></a>

**先決條件**
+ 作用中 AWS 帳戶
+ 適用於 Google Analytics 的 Amazon AppFlow 連接器，已[設定](https://docs.aws.amazon.com/appflow/latest/userguide/google-analytics.html)
+ [Python](https://www.python.org/downloads/) 和 [pip](https://pip.pypa.io/en/stable/cli/pip_download/) (Python 的套件管理員）
+ Git，已安裝和[設定](https://git-scm.com/book/en/v2/Getting-Started-First-Time-Git-Setup)
+ AWS Command Line Interface (AWS CLI)，[已安裝](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)並[設定](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
+ AWS Cloud Development Kit (AWS CDK)，[已安裝](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html#getting_started_install)

**產品版本**
+ Python 3.7 或更高版本
+ pip 9.0.3 或更新版本

## Architecture
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-architecture"></a>

**技術堆疊**
+ Amazon AppFlow
+ Amazon Athena
+ Amazon CloudWatch
+ Amazon EventBridge
+ Amazon Simple Storage Service (Amazon S3)
+ Amazon Simple Queue Service (Amazon SQS)
+ AWS DataOps 開發套件 (AWS DDK)
+ AWS Lambda

**目標架構**

下圖顯示擷取、轉換和分析 Google Analytics 資料的事件驅動程序。

![\[使用 AWS 服務擷取、轉換和分析 Google Analytics 資料。\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/images/pattern-img/edf40222-2867-4d4a-9153-ab29785b6662/images/8c38b472-153b-4497-982c-8efb97d2f7a5.png)


該圖顯示以下工作流程：

1. Amazon CloudWatch 排程事件規則會叫用 Amazon AppFlow。

1. Amazon AppFlow 會將 Google Analytics 資料擷取到 S3 儲存貯體。

1. 在 S3 儲存貯體擷取資料之後，EventBridge 中的事件通知就會產生、由 CloudWatch Events 規則擷取，然後放入 Amazon SQS 佇列。

1. Lambda 函數會使用來自 Amazon SQS 佇列的事件、讀取個別的 S3 物件、將物件轉換為 Apache Parquet 格式、將轉換的物件寫入 S3 儲存貯體，然後建立或更新 AWS Glue Data Catalog 資料表定義。

1. Athena 查詢會針對資料表執行。

## 工具
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-tools"></a>

**AWS 工具**
+ [Amazon AppFlow](https://docs.aws.amazon.com/appflow/latest/userguide/what-is-appflow.html) 是一種全受管整合服務，可讓您在軟體即服務 (SaaS) 應用程式之間安全地交換資料。
+ [Amazon Athena](https://docs.aws.amazon.com/athena/latest/ug/what-is.html) 是一種互動式查詢服務，可協助您使用標準 SQL 直接在 Amazon S3 中分析資料。
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) 可協助您 AWS 即時監控 AWS 資源的指標，以及您執行的應用程式。
+ [Amazon EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-what-is.html) 是一種無伺服器事件匯流排服務，可協助您將應用程式與來自各種來源的即時資料連線。例如， AWS Lambda 函數、使用 API 目的地的 HTTP 呼叫端點，或其他事件匯流排 AWS 帳戶。
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) 是一種雲端型物件儲存服務，可協助您儲存、保護和擷取任何數量的資料。
+ [Amazon Simple Queue Service (Amazon SQS)](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) 提供安全、耐用且可用的託管佇列，可協助您整合和分離分散式軟體系統和元件。
+ [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html) 是一項運算服務，可協助您執行程式碼，無需佈建或管理伺服器。它只會在需要時執行程式碼並自動擴展，因此您只需按使用的運算時間付費。
+ [AWS Cloud Development Kit (AWS CDK)](https://docs.aws.amazon.com/cdk/v2/guide/home.html) 是在程式碼中定義雲端基礎設施並透過其佈建的架構 CloudFormation。
+ [AWS DataOps 開發套件 (AWS DDK)](https://github.com/awslabs/aws-ddk) 是一種開放原始碼開發架構，可協助您在其中建置資料工作流程和現代資料架構 AWS。

**Code**

此模式的程式碼可在 GitHub [AWS DataOps 開發套件 (AWS DDK)](https://github.com/awslabs/aws-ddk) [和使用 Amazon AppFlow、Amazon Athena 和 AWS DataOps 開發套件儲存庫分析 Google Analytics 資料](https://github.com/aws-samples/aws-ddk-examples/tree/main/google-analytics-data-using-appflow/python)中找到。

## 史詩
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-epics"></a>

### 準備環境
<a name="prepare-the-environment"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 複製原始程式碼。 | 若要複製原始程式碼，請執行下列命令：<pre>git clone https://github.com/aws-samples/aws-ddk-examples.git</pre> | DevOps 工程師 | 
| 建立虛擬環境。 | 導覽至原始程式碼目錄，然後執行下列命令來建立虛擬環境：<pre>cd google-analytics-data-using-appflow/python && python3 -m venv .venv</pre> | DevOps 工程師 | 
| 安裝相依性。 | 若要啟用虛擬環境並安裝相依性，請執行下列命令：<pre>source .venv/bin/activate && pip install -r requirements.txt</pre> | DevOps 工程師 | 

### 部署使用資料管道的應用程式
<a name="deploy-the-application-that-uses-your-data-pipeline"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 引導環境。 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit.html) | DevOps 工程師 | 
| 部署資料。 | 若要部署資料管道，請執行 `cdk deploy --profile [AWS_PROFILE]`命令。 | DevOps 工程師 | 

### 測試部署
<a name="test-the-deployment"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 驗證堆疊狀態。 | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit.html) | DevOps 工程師 | 

## 疑難排解
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-troubleshooting"></a>


| 問題 | 解決方案 | 
| --- | --- | 
| 部署在建立 `AWS::AppFlow::Flow`來源期間失敗，您會收到下列錯誤： `Connector Profile with name ga-connection does not exist` | 確認您已建立適用於 Google Analytics 的 Amazon AppFlow 連接器，並將其命名為 `ga-connection`。如需說明，請參閱 Amazon AppFlow 文件中的 [Google Analytics](https://docs.aws.amazon.com/appflow/latest/userguide/google-analytics.html)。 | 

## 相關資源
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-resources"></a>
+ [AWS DataOps 開發套件 (AWS DDK)](https://github.com/awslabs/aws-ddk) (GitHub)
+ [AWS DDK 範例](https://github.com/aws-samples/aws-ddk-examples) (GitHub)

## 其他資訊
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-additional"></a>

AWS DDK 資料管道由一或多個階段組成。在下列程式碼範例中，您會使用 從 Google Analytics `AppFlowIngestionStage` 擷取資料、`SqsToLambdaStage`處理資料轉換，以及`AthenaSQLStage`執行 Athena 查詢。

首先，會建立資料轉換和擷取階段，如下列程式碼範例所示：

```
        appflow_stage = AppFlowIngestionStage(
            self,
            id="appflow-stage",
            flow_name=flow.flow_name,
        )
        sqs_lambda_stage = SqsToLambdaStage(
            self,
            id="lambda-stage",
            lambda_function_props={
                "code": Code.from_asset("./ddk_app/lambda_handlers"),
                "handler": "handler.lambda_handler",
                "layers": [
                    LayerVersion.from_layer_version_arn(
                        self,
                        id="layer",
                        layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1",
                    )
                ],
                "runtime": Runtime.PYTHON_3_9,
            },
        )
        # Grant lambda function S3 read & write permissions
        bucket.grant_read_write(sqs_lambda_stage.function)
        # Grant Glue database & table permissions
        sqs_lambda_stage.function.add_to_role_policy(
            self._get_glue_db_iam_policy(database_name=database.database_name)
        )
        athena_stage = AthenaSQLStage(
            self,
            id="athena-sql",
            query_string=[
                (
                    "SELECT year, month, day, device, count(user_count) as cnt "
                    f"FROM {database.database_name}.ga_sample "
                    "GROUP BY year, month, day, device "
                    "ORDER BY cnt DESC "
                    "LIMIT 10; "
                )
            ],
            output_location=Location(
                bucket_name=bucket.bucket_name, object_key="query-results/"
            ),
            additional_role_policy_statements=[
                self._get_glue_db_iam_policy(database_name=database.database_name)
            ],
        )
```

接下來，建構會使用 EventBridge `DataPipeline` 規則將階段 "wire" 在一起，如下列程式碼範例所示：

```
        (
            DataPipeline(self, id="ingestion-pipeline")
            .add_stage(
                stage=appflow_stage,
                override_rule=Rule(
                    self,
                    "schedule-rule",
                    schedule=Schedule.rate(Duration.hours(1)),
                    targets=appflow_stage.targets,
                ),
            )
            .add_stage(
                stage=sqs_lambda_stage,
                # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully
                # Override rule below changes that behavior to call the the stage when data lands in the bucket instead
                override_rule=Rule(
                    self,
                    "s3-object-created-rule",
                    event_pattern=EventPattern(
                        source=["aws.s3"],
                        detail={
                            "bucket": {"name": [bucket.bucket_name]},
                            "object": {"key": [{"prefix": "ga-data"}]},
                        },
                        detail_type=["Object Created"],
                    ),
                    targets=sqs_lambda_stage.targets,
                ),
            )
            .add_stage(stage=athena_stage)
        )
```

如需更多程式碼範例，請參閱 GitHub [使用 Amazon AppFlow、Amazon Athena 和 AWS DataOps 開發套件儲存庫分析 Google Analytics 資料](https://github.com/aws-samples/aws-ddk-examples/tree/main/google-analytics-data-using-appflow/python)。