

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 AWS DataOps 开发套件构建数据管道以提取、转换和分析 Google Analytics 数据
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit"></a>

*Anton Kukushkin 和 Rudy Puig，Amazon Web Services*

## Summary
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-summary"></a>

此模式描述了如何使用 AWS DataOps 开发套件 (AWS DDK) 等构建数据管道来提取、转换和分析 Google Analytics 数据。 AWS 服务 AWS DDK 是一个开源开发框架，可帮助您在上 AWS面构建数据工作流程和现代数据架构。 AWS DDK 的主要目标之一是为您节省通常用于劳动密集型数据管道任务的时间和精力，例如协调管道、构建基础设施和创建基础架构。 DevOps 您可以将这些劳动密集型任务卸载到 AWS DDK，这样您就可以专注于编写代码和其他高价值的活动。

## 先决条件和限制
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-prereqs"></a>

**先决条件**
+ 活跃的 AWS 账户
+ 已[配置](https://docs.aws.amazon.com/appflow/latest/userguide/google-analytics.html)用于谷歌分析的 Amazon AppFlow 连接器
+ [Python](https://www.python.org/downloads/) 和 [pip](https://pip.pypa.io/en/stable/cli/pip_download/)（Python 的包管理器）
+ Git，已安装和[配置](https://git-scm.com/book/en/v2/Getting-Started-First-Time-Git-Setup)
+ AWS Command Line Interface (AWS CLI)，[已安装](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)并[配置](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
+ AWS Cloud Development Kit (AWS CDK)，[已安装](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html#getting_started_install)

**产品版本**
+ Python 3.7 或更高版本
+ pip 9.0.3 或更高版本

## 架构
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-architecture"></a>

**技术堆栈**
+ Amazon AppFlow
+ Amazon Athena
+ Amazon CloudWatch
+ Amazon EventBridge
+ Amazon Simple Storage Service（Amazon S3）
+ Amazon Simple Queue Service（Amazon SQS）
+ AWS DataOps 开发套件 (AWS DDK)
+ AWS Lambda

**目标架构**

下图显示了摄取、转换和分析 Google Analytics 数据的事件驱动流程。

![使用 AWS 服务摄取、转换和分析 Google Analytics 数据。](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/images/pattern-img/edf40222-2867-4d4a-9153-ab29785b6662/images/8c38b472-153b-4497-982c-8efb97d2f7a5.png)


下图显示了如下工作流：

1. 亚马逊 CloudWatch 计划的事件规则会调用亚马逊。 AppFlow

1. 亚马逊将谷 AppFlow 歌分析数据提取到 S3 存储桶中。

1. 在 S3 存储桶提取数据后，将生成中的 EventBridge 事件通知，由 CloudWatch 事件规则捕获，然后将其放入 Amazon SQS 队列中。

1. Lambda 函数使用来自 Amazon SQS 队列的事件，读取相应的 S3 对象，将对象转换为 Apache Parquet 格式，将转换后的对象写入 S3 存储桶，然后创建或更新表定义。 AWS Glue Data Catalog 

1. Athena 查询针对此表运行。

## 工具
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-tools"></a>

**AWS 工具**
+ [Amazon AppFlow](https://docs.aws.amazon.com/appflow/latest/userguide/what-is-appflow.html) 是一项完全托管的集成服务，使您能够在软件即服务 (SaaS) 应用程序之间安全地交换数据。
+ [Amazon Athena](https://docs.aws.amazon.com/athena/latest/ug/what-is.html) 是一种交互式查询服务，可帮助您使用标准 SQL 直接在 Amazon S3 中分析数据。
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) 可帮助您实时监控您的 AWS 资源和运行的应用程序 AWS 的指标。
+ [Amazon EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-what-is.html) 是一项无服务器事件总线服务，可帮助您将应用程序与来自各种来源的实时数据连接起来。例如， AWS Lambda 函数、使用 API 目的地的 HTTP 调用端点或其他 AWS 账户目的地的事件总线。
+ [Amazon Simple Storage Service（Amazon S3）](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html)是一项基于云的对象存储服务，可帮助您存储、保护和检索任意数量的数据。
+ [Amazon Simple Queue Service（Amazon SQS）](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)提供了一个安全、持久且可用的托管队列，它可帮助您集成和分离分布式软件系统与组件。
+ [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html) 是一项计算服务，可帮助您运行代码，无需预调配或管理服务器。它只在需要时运行您的代码，并自动进行扩展，因此您只需为使用的计算时间付费。
+ [AWS Cloud Development Kit (AWS CDK)](https://docs.aws.amazon.com/cdk/v2/guide/home.html)是一个框架，用于在代码中定义云基础架构并通过它进行配置 CloudFormation。
+ [AWS DataOps 开发套件 (AWS DDK)](https://github.com/awslabs/aws-ddk) 是一个开源开发框架，可帮助您在上 AWS面构建数据工作流程和现代数据架构。

**代码**

此模式的代码可在 GitHub [AWS DataOps 开发套件 (AWS DDK)](https://github.com/awslabs/aws-ddk) 和[使用亚马逊 AppFlow、Amazon Athena AWS DataOps 和开发套件存储库分析谷歌分析数据](https://github.com/aws-samples/aws-ddk-examples/tree/main/google-analytics-data-using-appflow/python)中找到。

## 操作说明
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-epics"></a>

### 准备环境
<a name="prepare-the-environment"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 克隆源代码。 | 要克隆源代码，请运行以下命令：<pre>git clone https://github.com/aws-samples/aws-ddk-examples.git</pre> | DevOps 工程师 | 
| 创建虚拟环境。 | 导航到源代码目录，然后运行以下命令创建虚拟环境：<pre>cd google-analytics-data-using-appflow/python && python3 -m venv .venv</pre> | DevOps 工程师 | 
| 安装依赖项。 | 要激活虚拟环境并安装依赖项，请运行以下命令：<pre>source .venv/bin/activate && pip install -r requirements.txt</pre> | DevOps 工程师 | 

### 部署使用您数据管线的应用程序
<a name="deploy-the-application-that-uses-your-data-pipeline"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 引导 环境。 | [See the AWS documentation website for more details](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit.html) | DevOps 工程师 | 
| 部署数据。 | 要部署数据管线，请运行 `cdk deploy --profile [AWS_PROFILE]` 命令。 | DevOps 工程师 | 

### 测试部署
<a name="test-the-deployment"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 验证堆栈状态。 | [See the AWS documentation website for more details](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit.html) | DevOps 工程师 | 

## 问题排查
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-troubleshooting"></a>


| 问题 | 解决方案 | 
| --- | --- | 
| 如果在创建 `AWS::AppFlow::Flow` 资源期间部署失败，您会收到以下错误：`Connector Profile with name ga-connection does not exist` | 确认您已为 Google Analytics（分析）创建了亚马逊 AppFlow 连接器并将其命名`ga-connection`。<br />有关说明，请参阅亚马逊 AppFlow 文档中的[谷歌分析](https://docs.aws.amazon.com/appflow/latest/userguide/google-analytics.html)。 | 

## 相关资源
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-resources"></a>
+ [AWS DataOps 开发套件 (AWS DDK) (GitHub)](https://github.com/awslabs/aws-ddk)
+ [AWS DDK 示例](https://github.com/aws-samples/aws-ddk-examples) () GitHub

## 附加信息
<a name="build-a-data-pipeline-to-ingest-transform-and-analyze-google-analytics-data-using-the-aws-dataops-development-kit-additional"></a>

AWS DDK 数据管道由一个或多个阶段组成。在以下代码示例中，您使用 `AppFlowIngestionStage` 从 Google Analytics 摄取数据，使用 `SqsToLambdaStage` 处理数据转换，使用 `AthenaSQLStage` 运行 Athena 查询。

首先，创建数据转换和摄取阶段，如以下代码示例所示：

```
        appflow_stage = AppFlowIngestionStage(
            self,
            id="appflow-stage",
            flow_name=flow.flow_name,
        )
        sqs_lambda_stage = SqsToLambdaStage(
            self,
            id="lambda-stage",
            lambda_function_props={
                "code": Code.from_asset("./ddk_app/lambda_handlers"),
                "handler": "handler.lambda_handler",
                "layers": [
                    LayerVersion.from_layer_version_arn(
                        self,
                        id="layer",
                        layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1",
                    )
                ],
                "runtime": Runtime.PYTHON_3_9,
            },
        )
        # Grant lambda function S3 read & write permissions
        bucket.grant_read_write(sqs_lambda_stage.function)
        # Grant Glue database & table permissions
        sqs_lambda_stage.function.add_to_role_policy(
            self._get_glue_db_iam_policy(database_name=database.database_name)
        )
        athena_stage = AthenaSQLStage(
            self,
            id="athena-sql",
            query_string=[
                (
                    "SELECT year, month, day, device, count(user_count) as cnt "
                    f"FROM {database.database_name}.ga_sample "
                    "GROUP BY year, month, day, device "
                    "ORDER BY cnt DESC "
                    "LIMIT 10; "
                )
            ],
            output_location=Location(
                bucket_name=bucket.bucket_name, object_key="query-results/"
            ),
            additional_role_policy_statements=[
                self._get_glue_db_iam_policy(database_name=database.database_name)
            ],
        )
```

接下来，使用该`DataPipeline`构造通过使用 EventBridge 规则将各个阶段 “连接” 在一起，如以下代码示例所示：

```
        (
            DataPipeline(self, id="ingestion-pipeline")
            .add_stage(
                stage=appflow_stage,
                override_rule=Rule(
                    self,
                    "schedule-rule",
                    schedule=Schedule.rate(Duration.hours(1)),
                    targets=appflow_stage.targets,
                ),
            )
            .add_stage(
                stage=sqs_lambda_stage,
                # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully
                # Override rule below changes that behavior to call the the stage when data lands in the bucket instead
                override_rule=Rule(
                    self,
                    "s3-object-created-rule",
                    event_pattern=EventPattern(
                        source=["aws.s3"],
                        detail={
                            "bucket": {"name": [bucket.bucket_name]},
                            "object": {"key": [{"prefix": "ga-data"}]},
                        },
                        detail_type=["Object Created"],
                    ),
                    targets=sqs_lambda_stage.targets,
                ),
            )
            .add_stage(stage=athena_stage)
        )
```

有关更多代码示例，请参阅[使用亚马逊、Amazon AppFlow Athena AWS DataOps 和开发套件 GitHub 分析谷歌分析数据存储库](https://github.com/aws-samples/aws-ddk-examples/tree/main/google-analytics-data-using-appflow/python)。