

# 设置警报、部署和计划
<a name="data-quality-alerts"></a>

本主题介绍如何为 AWS Glue Data Quality 设置警报、部署和计划。

**Contents**
+ [在 Amazon EventBridge 集成中设置警报和通知](#data-quality-alerts-eventbridge)
  + [事件模式的其他配置选项](#data-quality-alerts-eventbridge-config-options)
  + [将通知格式化为电子邮件](#data-quality-alerts-eventbridge-format-notifications)
+ [在 CloudWatch 集成中设置警报和通知](#data-quality-alerts-cloudwatch)
+ [查询数据质量结果以构建控制面板](#data-quality-alerts-querying-results)
+ [使用 AWS CloudFormation 部署数据质量规则](#data-quality-deploy-cfn)
+ [计划数据质量规则](#data-quality-scheduling-rules)

## 在 Amazon EventBridge 集成中设置警报和通知
<a name="data-quality-alerts-eventbridge"></a>

AWS Glue Data Quality 支持发布 EventBridge 事件，这些事件是在 Data Quality 规则集评估运行完成后发出的。这样，当数据质量规则失效时，您可以轻松设置警报。

以下是您在 Data Catalog 中评估数据质量规则集时的示例事件。有了这些信息，您就可以查看 Amazon EventBridge 提供的数据。您可以发出其他 API 调用以获取更多详细信息。例如，使用结果 ID 调用 `get_data_quality_result` API 以获取特定执行的详细信息。

```
{
    "version":"0",
    "id":"abcdef00-1234-5678-9abc-def012345678",
    "detail-type":"Data Quality Evaluation Results Available",
    "source":"aws.glue-dataquality",
    "account":"123456789012",
    "time":"2017-09-07T18:57:21Z",
    "region":"us-west-2",
    "resources":[],
    "detail":{
        "context": {
                    "contextType": "GLUE_DATA_CATALOG",
                    "runId":"dqrun-12334567890",
                    "databaseName": "db-123",
                    "tableName": "table-123",
                    "catalogId": "123456789012"
                    },
        "resultID": "dqresult-12334567890",
        "rulesetNames": ["rulset1"],
        "state":"SUCCEEDED",
        "score": 1.00,
        "rulesSucceeded": 100,
        "rulesFailed": 0,
        "rulesSkipped": 0
    }
}
```

以下是您在评估 AWS Glue ETL 或 AWS Glue Studio 笔记本中的数据质量规则集时发布的示例事件。

```
{
    "version":"0",
    "id":"abcdef00-1234-5678-9abc-def012345678",
    "detail-type":"Data Quality Evaluation Results Available",
    "source":"aws.glue-dataquality",
    "account":"123456789012",
    "time":"2017-09-07T18:57:21Z",
    "region":"us-west-2",
    "resources":[],
    "detail":{
        "context": {
                    "contextType": "GLUE_JOB",
                    "jobId": "jr-12334567890",
                    "jobName": "dq-eval-job-1234",
                    "evaluationContext": "", 
                    }
        "resultID": "dqresult-12334567890",
        "rulesetNames": ["rulset1"],
        "state":"SUCCEEDED",
        "score": 1.00
        "rulesSucceeded": 100,
        "rulesFailed": 0,
        "rulesSkipped": 0
    }
}
```

要在 Data Catalog 和 ETL 作业中同时运行数据质量评估，必须保持选中默认处于选中状态的**将指标发布到 Amazon CloudWatch** 选项，EventBridge 才可以发布。

**设置 EventBridge 通知**

![\[AWS CloudFormation 中的数据质量属性\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/data-quality-properties-cfn.png)


要接收发出的事件并定义目标，必须配置 Amazon EventBridge 规则。要创建规则，请执行以下操作：

1. 打开 Amazon EventBridge 控制台。

1. 在导航栏的**总线**部分下选择**规则**。

1. 选择 **Create Rule**。

1. 在**定义规则详细信息**中：

   1. 对于名称，请输入 `myDQRule`。

   1. 输入描述（可选）。

   1. 对于事件总线，请选择您的事件总线。如果您没有，请将其保留为默认值。

   1. 对于“规则类型”，选择**具有事件模式的规则**，然后选择**下一步**。

1. 在**构建事件模式**中：

   1. 对于事件源，选择** AWS 事件或 EventBridge 合作伙伴事件**。

   1. 跳过示例事件部分。

   1. 对于创建方法，选择**使用模式表单**。

   1. 对于事件模式：

      1. 为事件源选择 **AWS 服务**。

      1. 为 AWS 服务选择 **Glue Data Quality**。

      1. 为“事件类型”选择**可用的数据质量评估结果**。

      1. 为“特定状态”选择**失败**。然后，您将看到类似于以下内容的事件模式：

         ```
         {
           "source": ["aws.glue-dataquality"],
           "detail-type": ["Data Quality Evaluation Results Available"],
           "detail": {
             "state": ["FAILED"]
           }
         }
         ```

      1. 有关更多连接选项，请参阅 [事件模式的其他配置选项](#data-quality-alerts-eventbridge-config-options)。

1. 在**选择目标**上：

   1. 对于**目标类型**，选择 ** AWS 服务**。

   1. 使用**选择目标**下拉列表选择要连接的所需 AWS 服务（SNS、Lambda、SQS 等），然后选择**下一步**。

1. 在**配置标签**上，单击**添加新标签**以添加可选标签，然后选择**下一步**。

1. 您会看到所有选择的摘要页面。选择底部的**创建规则**。

### 事件模式的其他配置选项
<a name="data-quality-alerts-eventbridge-config-options"></a>

除了根据成功或失败筛选事件外，您可能还需要根据不同的参数进一步筛选事件。

为此，请转到“事件模式”部分，然后选择**编辑模式**以指定其他参数。请注意，事件模式中的字段区分大小写。以下是配置事件模式的示例。

要从评估特定规则集的特定表中捕获事件，请使用以下类型的模式：

```
{
  "source": ["aws.glue-dataquality"],
  "detail-type": ["Data Quality Evaluation Results Available"],
  "detail": {
    "context": {
      "contextType": ["GLUE_DATA_CATALOG"],
      "databaseName": "db-123",
       "tableName": "table-123",
    },
    "rulesetNames": ["ruleset1", "ruleset2"]
    "state": ["FAILED"]
  }
}
```

要在 ETL 体验中捕获来自特定作业的事件，请使用以下类型的模式：

```
{
  "source": ["aws.glue-dataquality"],
  "detail-type": ["Data Quality Evaluation Results Available"],
  "detail": {
    "context": {
      "contextType": ["GLUE_JOB"],
      "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"]
    },
    "state": ["FAILED"]
  }
}
```

要捕获分数低于特定阈值（例如 70%）的事件，请执行以下操作：

```
{
  "source": ["aws.glue-dataquality"],
  "detail-type": ["Data Quality Evaluation Results Available"],
  "detail": {
    "score": [{
      "numeric": ["<=", 0.7]
    }]
  }
}
```

### 将通知格式化为电子邮件
<a name="data-quality-alerts-eventbridge-format-notifications"></a>

有时，您需要向业务团队发送格式完善的电子邮件通知。您可以使用 Amazon EventBridge 和 AWS Lambda 实现这一目标。

![\[格式化为电子邮件的数据质量通知\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/data_quality_sample_email.png)


以下示例代码可用于格式化数据质量通知以生成电子邮件。

```
import boto3
import json
from datetime import datetime

sns_client = boto3.client('sns')
glue_client = boto3.client('glue')

sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>'



def lambda_handler(event, context):
    log_metadata = {}
    message_text = ""
    subject_text = ""

    if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG':
        log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0])
        log_metadata['tableName'] = str(event['detail']['context']['tableName'])
        log_metadata['databaseName'] = str(event['detail']['context']['databaseName'])
        log_metadata['runId'] = str(event['detail']['context']['runId'])
        log_metadata['resultId'] = str(event['detail']['resultId'])
        log_metadata['state'] = str(event['detail']['state'])
        log_metadata['score'] = str(event['detail']['score'])
        log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded'])
        log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed'])
        log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped'])

        message_text += "Glue Data Quality run details:\n"
        message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name'])
        message_text += "glue_table_name: {}\n".format(log_metadata['tableName'])
        message_text += "glue_database_name: {}\n".format(log_metadata['databaseName'])
        message_text += "run_id: {}\n".format(log_metadata['runId'])
        message_text += "result_id: {}\n".format(log_metadata['resultId'])
        message_text += "state: {}\n".format(log_metadata['state'])
        message_text += "score: {}\n".format(log_metadata['score'])
        message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded'])
        message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed'])
        message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped'])

        subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name'])

    else:
        log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0])
        log_metadata['jobName'] = str(event['detail']['context']['jobName'])
        log_metadata['jobId'] = str(event['detail']['context']['jobId'])
        log_metadata['resultId'] = str(event['detail']['resultId'])
        log_metadata['state'] = str(event['detail']['state'])
        log_metadata['score'] = str(event['detail']['score'])

        log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded'])
        log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed'])
        log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped'])

        message_text += "Glue Data Quality run details:\n"
        message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name'])
        message_text += "glue_job_name: {}\n".format(log_metadata['jobName'])
        message_text += "job_id: {}\n".format(log_metadata['jobId'])
        message_text += "result_id: {}\n".format(log_metadata['resultId'])
        message_text += "state: {}\n".format(log_metadata['state'])
        message_text += "score: {}\n".format(log_metadata['score'])
        message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded'])
        message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed'])
        message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped'])

        subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name'])

    resultID = str(event['detail']['resultId'])
    response = glue_client.get_data_quality_result(ResultId=resultID)
    RuleResults = response['RuleResults']
    message_text += "\n\nruleset details evaluation steps results:\n\n"
    subresult_info = []

    for dic in RuleResults:
        subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description'])
        if 'EvaluationMessage' in dic:
            subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage'])
        subresult_info.append({
            'Name': dic['Name'],
            'Result': dic['Result'],
            'Description': dic['Description'],
            'EvaluationMessage': dic.get('EvaluationMessage', '')
        })
        message_text += "\n" + subresult

    log_metadata['resultrun'] = subresult_info



    sns_client.publish(
        TopicArn=sns_topic_arn,
        Message=message_text,
        Subject=subject_text
    )

    return {
        'statusCode': 200,
        'body': json.dumps('Message published to SNS topic')
    }
```

## 在 CloudWatch 集成中设置警报和通知
<a name="data-quality-alerts-cloudwatch"></a>

我们推荐的方法是使用 Amazon EventBridge 设置数据质量警报，因为 Amazon EventBridge 需要一次性设置才能提醒客户。但是，由于熟悉的原因，有些客户更喜欢使用 Amazon CloudWatch。对于此类客户，我们提供与 Amazon CloudWatch 的集成。

每次 AWS Glue Data Quality 评估都会在每次数据质量运行时发出一对名为 `glue.data.quality.rules.passed`（表示通过的规则数量）和 `glue.data.quality.rules.failed`（表示失败的规则数）的指标。您可以使用此发出的指标创建警报，以便在给定的数据质量运行低于阈值时提醒用户。要开始设置可通过 Amazon SNS 通知发送电子邮件的警报，请按照以下步骤操作：

要开始设置可通过 Amazon SNS 通知发送电子邮件的警报，请按照以下步骤操作：

1. 打开 Amazon CloudWatch 控制台。

1. 在**指标**下选择**所有指标**。您将在自定义命名空间下看到一个名为 Glue Data Quality 的额外命名空间。
**注意**  
启动 AWS Glue Data Quality 运行时，请确保已启用**将指标发布到 Amazon CloudWatch**复选框。否则，该特定运行的指标将不会发布到 Amazon CloudWatch。

   在 `Glue Data Quality` 命名空间下，您可以看到每个表、每个规则集发出的指标。对于本主题，如果该值超过 1，我们将使用 `glue.data.quality.rules.failed` 规则和警报（表示，如果我们看到许多失败的规则评估大于 1，我们希望收到通知）。

1. 要创建警报，请在**警报**下选择**所有警报**。

1. 选择**创建警报**。

1. 选择**选择指标**。

1. 选择与您创建的表格相对应的 `glue.data.quality.rules.failed` 指标，然后选择**选择指标**。

1. 在**指定指标和条件**选项卡下的**指标**部分下：

   1. 对于 **Statistic（统计数据）**，选择 **Sum（总计）**。

   1. 对于**周期**，选择 **1 分钟**。

1. 在**条件**部分下：

   1. 对于**阈值类型**，选择**静态**。

   1. 对于**每当 glue.data.quality.rules.failed 为...**，选择**大于/等于**。

   1. 对于**不止于...**，输入 **1** 作为阈值。

   这些选择意味着，如果 `glue.data.quality.rules.failed` 指标发出的值大于或等于 1，我们将触发警报。但是，如果没有数据，我们会将其视为可接受。

1. 选择**下一步**。

1. 在**配置操作**中：

   1. 对于**警报状态触发器**，选择**报警中**。

   1. 对于**向以下 SNS 主题发送通知**部分，选择**创建新主题以通过新的 SNS 主题发送通知**。

   1. 对于**将接收通知的电子邮件端点**，请输入您的电子邮件地址。然后单击**创建主题**。

   1. 选择**下一步**。

1. 对于**警报名称**，输入 `myFirstDQAlarm`，然后选择**下一步**。

1. 您会看到所有选择的摘要页面。选择底部的**创建警报**。

现在，您可以从 Amazon CloudWatch 警报控制面板中看到正在创建的警报。

## 查询数据质量结果以构建控制面板
<a name="data-quality-alerts-querying-results"></a>

您可能需要构建一个控制面板来显示您的数据质量结果。有两种方式可执行此操作：

**使用以下代码设置 Amazon EventBridge，将数据写入 Amazon S3：**

```
import boto3
import json
from datetime import datetime


s3_client = boto3.client('s3')
glue_client = boto3.client('glue')


s3_bucket = 's3-bucket-name'

def write_logs(log_metadata):
    try:
        filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json"
        key_opts = {
            'year': datetime.now().year,
            'month': "{:02d}".format(datetime.now().month),
            'day': "{:02d}".format(datetime.now().day),
            'filename': filename
        }
        s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts)
        s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata))
    except Exception as e:
        print(f'Error writing logs to S3: {e}')


def lambda_handler(event, context):
    log_metadata = {}
    message_text = ""
    subject_text = ""

    if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG':
        log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0])
        log_metadata['tableName'] = str(event['detail']['context']['tableName'])
        log_metadata['databaseName'] = str(event['detail']['context']['databaseName'])
        log_metadata['runId'] = str(event['detail']['context']['runId'])
        log_metadata['resultId'] = str(event['detail']['resultId'])
        log_metadata['state'] = str(event['detail']['state'])
        log_metadata['score'] = str(event['detail']['score'])
        log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded'])
        log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed'])
        log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped'])

        message_text += "Glue Data Quality run details:\n"
        message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name'])
        message_text += "glue_table_name: {}\n".format(log_metadata['tableName'])
        message_text += "glue_database_name: {}\n".format(log_metadata['databaseName'])
        message_text += "run_id: {}\n".format(log_metadata['runId'])
        message_text += "result_id: {}\n".format(log_metadata['resultId'])
        message_text += "state: {}\n".format(log_metadata['state'])
        message_text += "score: {}\n".format(log_metadata['score'])
        message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded'])
        message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed'])
        message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped'])

        subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name'])

    else:
        log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0])
        log_metadata['jobName'] = str(event['detail']['context']['jobName'])
        log_metadata['jobId'] = str(event['detail']['context']['jobId'])
        log_metadata['resultId'] = str(event['detail']['resultId'])
        log_metadata['state'] = str(event['detail']['state'])
        log_metadata['score'] = str(event['detail']['score'])

        log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded'])
        log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed'])
        log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped'])

        message_text += "Glue Data Quality run details:\n"
        message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name'])
        message_text += "glue_job_name: {}\n".format(log_metadata['jobName'])
        message_text += "job_id: {}\n".format(log_metadata['jobId'])
        message_text += "result_id: {}\n".format(log_metadata['resultId'])
        message_text += "state: {}\n".format(log_metadata['state'])
        message_text += "score: {}\n".format(log_metadata['score'])
        message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded'])
        message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed'])
        message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped'])

        subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name'])

    resultID = str(event['detail']['resultId'])
    response = glue_client.get_data_quality_result(ResultId=resultID)
    RuleResults = response['RuleResults']
    message_text += "\n\nruleset details evaluation steps results:\n\n"
    subresult_info = []

    for dic in RuleResults:
        subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description'])
        if 'EvaluationMessage' in dic:
            subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage'])
        subresult_info.append({
            'Name': dic['Name'],
            'Result': dic['Result'],
            'Description': dic['Description'],
            'EvaluationMessage': dic.get('EvaluationMessage', '')
        })
        message_text += "\n" + subresult

    log_metadata['resultrun'] = subresult_info

    write_logs(log_metadata)


    return {
        'statusCode': 200,
        'body': json.dumps('Message published to SNS topic')
    }
```

写入 Amazon S3 后，您可以使用 AWS Glue 爬网程序注册到 Athena 并查询表。

**在数据质量评估期间配置 Amazon S3 位置：**

在 AWS Glue Data Catalog 或 AWS Glue ETL 中运行数据质量任务时，您可以提供 Amazon S3 位置来将数据质量结果写入 Amazon S3。您可以使用以下语法通过引用目标来创建表，以读取数据质量结果。

请注意：必须分别运行 `CREATE EXTERNAL TABLE` 和 `MSCK REPAIR TABLE` 查询。

```
CREATE EXTERNAL TABLE <my_table_name>(
    catalogid string,
    databasename string,
    tablename string,
    dqrunid string,
    evaluationstartedon timestamp,
    evaluationcompletedon timestamp,
    rule string,
    outcome string,
    failurereason string,
    evaluatedmetrics string) 
PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/'
TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file');
```

```
MSCK REPAIR TABLE <my_table_name>;
```

创建上表后，即可使用 Amazon Athena 运行分析查询。

## 使用 AWS CloudFormation 部署数据质量规则
<a name="data-quality-deploy-cfn"></a>

您可以使用 AWS CloudFormation 创建数据质量规则。有关更多信息，请参阅 [AWS CloudFormation for AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/populate-with-cloudformation-templates.html)。

## 计划数据质量规则
<a name="data-quality-scheduling-rules"></a>

您可以使用以下方法计划数据质量规则：
+  从 Data Catalog 中计划数据质量规则：任何代码用户都无法使用此选项轻松计划数据质量扫描。AWSGlue Data Quality 将在 Amazon EventBridge 中创建计划。要计划数据质量规则，请执行以下操作：
  +  导航到规则集并单击**运行**。
  +  在**运行频率**中，选择所需的计划并提供**任务名称**。此任务名称是您在 EventBridge 中计划的名称。
+ 使用 Amazon EventBridge 和 AWS Step Functions 来编排数据质量规则的评估和建议。