

# 评估 AWS Glue Studio 中 ETL 作业的数据质量
<a name="tutorial-data-quality"></a>

在本教程中，您将开始使用 AWS Glue Studio 中的 AWS Glue 数据质量。您将了解如何执行以下操作：
+  使用数据质量定义语言（DQDL）规则构建器创建规则集。
+  指定数据质量操作、要输出的数据以及数据质量结果的输出位置。
+  查看数据质量结果。

 要通过示例进行练习，请查看博客文章[https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-for-etl-pipelines/](https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-for-etl-pipelines/)。

## 步骤 1：将“评估数据质量”转换节点添加到可视化作业
<a name="tutorial-data-quality-step1"></a>

在此步骤中，将“评估数据质量”节点添加到可视作业编辑器中。

**添加数据质量节点**

1.  在 AWS Glue Studio 控制台中，从**创建作业**部分选择**源和目标可视化**，然后选择**创建**。

1.  选择要应用数据质量转换的节点。通常，这将是转换节点或数据来源。

1.  选择“\$1”图标打开左侧的资源面板。然后在搜索栏中搜索**评估数据质量**，然后从搜索结果中选择**评估数据质量**。

1.  可视化作业编辑器将显示来自所选节点的**评估数据质量**转换节点分支。在控制台的右侧，**Transform**（转换）选项卡自动打开。如果您需要更改父节点，请选择**节点属性**选项卡，然后从下拉菜单中选择父节点。

    当您选择新的父节点时，将在父节点和 **Evaluate Data Quality**（评估数据质量）节点之间建立新的连接。移除所有不需要的父节点。只能将一个父节点连接到一个 **Evaluate Data Quality**（评估数据质量）节点。

1.  评估数据质量转换支持多个父数据集，因此您可以跨多个数据集验证数据质量规则。支持多个数据集的规则包括 ReferentialIntegrity、DatasetMatch、SchemaMatch、RowCountMatch 和 AggregateMatch。

   向“评估数据质量”转换添加多个输入时，需要选择“主要”输入。您的主要输入是要验证数据质量的数据集。所有其他节点或输入都被视为引用。

   您可以使用“评估数据质量”转换来标识未通过数据质量检查的特定记录。我们建议您选择主数据集，因为标记不良记录的新列会添加到主数据集中。

1.  您可以为输入数据来源指定别名。在使用 ReferentialIntegrity 规则时，别名提供了另一种引用输入源的方式。由于只能将一个数据来源指定为主要源，因此您添加的每个额外数据来源都需要一个别名。

   在以下示例中，ReferentialIntegrity 规则通过别名指定输入数据来源，并与主数据来源进行一对一比较。

   ```
   Rules = [
   	ReferentialIntegrity “Aliasname.name” = 1
   ]
   ```

## 步骤 2：使用 DQDL 创建规则
<a name="tutorial-data-quality-step2"></a>

在本步骤中，您将使用 DQDL 来创建规则。在本教程中，您使用**完整性**规则类型创建单个规则。此规则类型检查列中完整（非空）值与给定表达式的百分比。有关使用 DQDL 的更多信息，请参阅 [DQDL](https://docs.aws.amazon.com/glue/latest/dg/dqdl.html)。

1.  在**转换**选项卡中，选择**插入**按钮添加**规则类型**。这会将规则类型添加到规则编辑器中，您可以在其中输入规则的参数。
**注意**  
 编辑规则时，请确保规则在方括号内，并确保规则用逗号分隔。例如，完整的规则表达式将如下所示：  

   ```
   Rules= [
       Completeness "year">0.8, Completeness "month">0.8
   ]
   ```
 此示例为名为“年”和“月”的列指定了完整性参数。为了让规则通过，这些列的“完成”值必须大于 80%，或者每个相应列的实例中的数据必须超过 80%。

    在此示例中，搜索并插入 **Completeness**（完整性）规则类型。这会将规则类型添加到规则编辑器中。此规则类型具有以下语法：`Completeness <COL_NAME> <EXPRESSION>`。

   大多数规则类型都要求您提供表达式作为参数才能创建布尔响应。有关支持的 DQDL 表达式的更多信息，请参阅 [DQDL expressions](https://docs.aws.amazon.com/glue/latest/dg/dqdl.html#dqdl-syntax)。接下来，您将添加列名。

1.  在 DQDL 规则生成器中，选择**架构**选项卡。使用搜索栏在输入架构中查找列名。输入架构显示列名和数据类型。

1.  在规则编辑器中，单击规则类型的右侧，在要插入列的位置插入光标。或者，您也可以在规则中输入列的名称。

    例如，在输入架构列表的列列表中，选择该列旁边的**插入**按钮（在本示例中为**年份**）。这会将该列添加到规则中。

1.  然后，在规则编辑器中添加一个表达式来评估规则。由于**完整性**规则类型会根据给定表达式检查列中完整（非空）值的百分比，因此请输入一个表达式（例如 `> 0.8`）。该规则检查该列的完整值（非空值）是否大于 80%。

## 步骤 3：配置数据质量输出
<a name="tutorial-data-quality-step3"></a>

 创建数据质量规则后，您可以选择其他选项来指定数据质量节点输出：

1.  在 **Data quality transform output**（数据质量转换输出）中，从以下选项中选择：
   +  **原始数据** — 选择输出原始输入数据。当您选择此选项时，作业中会添加一个新的子节点“rowLevelOutcomes”。该架构与作为输入传递给转换的主数据集的架构相匹配。如果您只想在出现质量问题时传递数据并使作业失败，则此选项非常有用。

     另一个用例是您想要检测未通过数据质量检查的不良记录。要检测不良记录，请选择**添加新列以指示数据质量错误**选项。此操作将在“rowLevelOutcomes”转换的架构中添加四个新列。
     +  **DataQualityRulesPass**（字符串数组）— 提供一组通过数据质量检查的规则。
     +  **DataQualityRulesFail**（字符串数组）— 提供一组未通过数据质量检查的规则。
     +  **DataQualityRulesSkip**（字符串数组）— 提供已跳过的规则数组。以下规则无法识别错误记录，因为它们是在数据集级别应用的。
       +  AggregateMatch 
       +  ColumnCount 
       +  ColumnExists 
       +  ColumnNamesMatchPattern 
       +  CustomSql 
       +  RowCount 
       +  RowCountMatch 
       +  StandardDeviation 
       +  平均值 
       +  ColumnCorrelation 
     +  **DataQualityEvaluationResult** — 在行级别提供“通过”或“失败”状态。请注意：您的总体结果可能为“失败”，但某项记录可能会通过。例如，RowCount 规则可能已失败，但所有其他规则可能都已成功。在这种情况下，此字段状态为“已通过”。

1.  **数据质量结果** — 选择输出配置的规则及其通过或失败状态。如果要将结果写入 Amazon S3 或其他数据库，此选项非常有用。

1. **数据质量输出设置**（可选）— 选择**数据质量输出设置**以显示**数据质量结果位置**字段。然后，选择**浏览**搜索要设置为数据质量输出目标的 Amazon S3 位置。

## 步骤 4：配置数据质量操作
<a name="tutorial-data-quality-step4"></a>

 您可以使用操作将指标发布到 CloudWatch 或根据特定条件停止作业。只有在创建规则后，操作才可用。当您选择此选项时，同样的指标也会发布到 Amazon EventBridge。您可以使用这些选项来[创建通知警报](https://docs.aws.amazon.com/glue/latest/dg/data-quality-alerts.html)。
+  **规则集失败时** — 您可以选择在作业运行时，如果规则集失败该怎么做。如果您希望在数据质量失败时作业失败，请通过选择以下选项之一来选择作业何时失败。默认情况下，此操作处于未选中状态，即使数据质量规则失败，作业也将完成运行。
  +  **无** — 如果选择**无**（默认值），则作业不会失败，即使规则集失败，作业仍会继续运行。
  +  **将数据加载到目标后作业失败** — 作业失败且未保存任何数据。要保存结果，请选择用于保存数据质量结果的 Amazon S3 位置。
  +  **在不加载到目标数据的情况下作业失败** — 当发生数据质量错误时，此选项会立即使作业失败。它不加载任何数据目标，包括数据质量转换的结果。

## 步骤 5：查看数据质量结果
<a name="tutorial-data-quality-step5"></a>

 运行作业后，选择**数据质量**选项卡查看数据质量结果。

1.  对于每个运行作业，查看数据质量结果。每个节点显示数据质量状态和状态详细信息。选择节点可查看所有规则和每条规则的状态。

1.  选择**下载结果**可下载包含有关作业运行和数据质量结果信息的 CSV 文件。

1.  如果您运行了多个带有数据质量结果的作业，则可以按日期和时间范围筛选结果。选择*按日期和时间范围筛选*以展开筛选窗口。

1.  您可以选择 Relative range（相对范围）或者 Absolute range（绝对范围）。对于绝对范围，请使用日历选择日期并输入开始时间和结束时间的值。完成后，选择**应用**。

## 自动数据质量
<a name="automatic-data-quality"></a>

 当您创建以 Amazon S3 为目标的 AWS Glue ETL 作业时，AWS Glue ETL 会自动启用一条数据质量规则，该规则检查正在加载的数据是否至少有一列。此规则旨在确保加载的数据不为空或损坏。但是，如果此规则失败，作业并不会失败；相反，您会注意到数据质量分数下降。此外，默认情况下启用异常检测，用于监控数据中的列数。如果列数有任何变化或异常，AWS Glue ETL 将通知您这些异常。此功能有助于您识别数据的潜在问题并采取适当的措施。要查看数据质量规则及其配置，您可以单击 AWS Glue ETL 作业中的 Amazon S3 目标。将显示规则的配置，如提供的屏幕截图所示。

![\[屏幕截图显示可供选择的自动数据质量选项。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/automatic-data-quality.png)


 您可以通过选择**编辑数据质量配置**来添加其他数据质量规则。

## 聚合指标
<a name="data-quality-aggregated-metrics"></a>

您可能需要聚合指标，例如在规则级别或规则集级别通过、未通过、跳过的记录条数，来构建控制面板。要获取每条规则的聚合指标和规则指标，首先要向 `EvaluateDataQuality` 函数添加 `publishAggregatedMetrics` 选项以启用聚合指标。

可能的 `additional_options` `publishAggregatedMetrics` 选项包括 `ENABLED` 和 `DISABLED`。例如：

```
EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows(
    frame=medicare_dyf,
    ruleset=EvaluateDataQuality_ruleset,
    publishing_options={
        "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe",
        "enableDataQualityCloudWatchMetrics": False,
        "enableDataQualityResultsPublishing": False,
    },
    additional_options={"publishAggregatedMetrics.status": "ENABLED"},
)
```

若未指定，`publishAggregatedMetrics.status` 默认为 `DISABLED`，现在将计算规则指标和聚合指标。AWS Glue 交互式会话和 Glue ETL 作业目前支持此功能。Glue Catalog Data Quality API 中不支持此功能。

### 检索聚合指标结果
<a name="data-quality-aggregated-metrics-results"></a>

当 `additionalOptions` `"publishAggregatedMetrics.status": "ENABLED"` 时，可以在两个地方得到结果：

1. 提供 `resultId` 时通过 `GetDataQualityResult()` 返回 `AggregatedMetrics` 和 `RuleMetrics`，其中 `AggregatedMetrics` 和 `RuleMetrics` 包括：

   **聚合指标：**
   + 处理的总行数
   + 通过的总行数
   + 未通过的总行数
   + 处理的总规则数
   + 通过的总规则数
   + 未通过的总规则数  
![\[屏幕截图显示的是 AWS Glue 数据质量评估的聚合指标和规则指标结构。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/data-quality-aggregated-metrics.png)

   此外，在规则级别还提供了以下指标：

   **规则指标：**
   + 通过的行数
   + 未通过的行数
   + 跳过的行数
   + 处理的总行数

1. 返回 `AggregatedMetrics` 作为附加数据框，并扩充 `RuleOutcomes` 数据框进行以纳入 `RuleMetrics`。

### 实施示例
<a name="data-quality-aggregated-metrics-example"></a>

以下示例说明如何在 Scala 中实现聚合指标：

```
// Script generated for node Evaluate Data Quality
val EvaluateDataQuality_node1741974822533_ruleset = """
  # Example rules: Completeness "colA" between 0.4 and 0.8, ColumnCount > 10
  Rules = [
      IsUnique "customer_identifier",
      RowCount > 10,
      Completeness "customer_identifier" > 0.5
  ]
"""

val EvaluateDataQuality_node1741974822533 = EvaluateDataQuality.processRows(frame=ChangeSchema_node1742850392012, ruleset=EvaluateDataQuality_node1741974822533_ruleset, publishingOptions=JsonOptions("""{"dataQualityEvaluationContext": "EvaluateDataQuality_node1741974822533", "enableDataQualityCloudWatchMetrics": "true", "enableDataQualityResultsPublishing": "true"}"""), additionalOptions=JsonOptions("""{"compositeRuleEvaluation.method":"ROW","observations.scope":"ALL","performanceTuning.caching":"CACHE_NOTHING", "publishAggregatedMetrics.status": "ENABLED"}"""))

println("--------------------------------ROW LEVEL OUTCOMES--------------------------------")
val rowLevelOutcomes_node = EvaluateDataQuality_node1741974822533("rowLevelOutcomes")

rowLevelOutcomes_node.show(10)

 println("--------------------------------RULE LEVEL OUTCOMES--------------------------------")

val ruleOutcomes_node = EvaluateDataQuality_node1741974822533("ruleOutcomes")

ruleOutcomes_node.show()

 println("--------------------------------AGGREGATED METRICS--------------------------------")

val aggregatedMetrics_node = EvaluateDataQuality_node1741974822533("aggregatedMetrics")

aggregatedMetrics_node.show()
```

### 结果示例
<a name="data-quality-aggregated-metrics-sample-results"></a>

结果按如下方式返回：

```
{
    "Rule": "IsUnique \"customer_identifier\"",
    "Outcome": "Passed",
    "FailureReason": null,
    "EvaluatedMetrics": {
        "Column.customer_identifier.Uniqueness": 1
    },
    "EvaluatedRule": "IsUnique \"customer_identifier\"",
    "PassedCount": 10,
    "FailedCount": 0,
    "SkippedCount": 0,
    "TotalCount": 10
}
{
    "Rule": "RowCount > 10",
    "Outcome": "Failed",
    "FailureReason": "Value: 10 does not meet the constraint requirement!",
    "EvaluatedMetrics": {
        "Dataset.*.RowCount": 10
    },
    "EvaluatedRule": "RowCount > 10",
    "PassedCount": 0,
    "FailedCount": 0,
    "SkippedCount": 10,
    "TotalCount": 10
}
{
    "Rule": "Completeness \"customer_identifier\" > 0.5",
    "Outcome": "Passed",
    "FailureReason": null,
    "EvaluatedMetrics": {
        "Column.customer_identifier.Completeness": 1
    },
    "EvaluatedRule": "Completeness \"customer_identifier\" > 0.5",
    "PassedCount": 10,
    "FailedCount": 0,
    "SkippedCount": 0,
    "TotalCount": 10
}
```

聚合指标如下所示：

```
{ "TotalRowsProcessed": 10, "PassedRows": 10, "FailedRows": 0, "TotalRulesProcessed": 3, "RulesPassed": 2, "RulesFailed": 1 }
```