

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 高级主题
<a name="model-monitor-advanced-topics"></a>

以下各节包含更高级的任务，说明如何使用预处理和后处理脚本自定义监控、如何构建自己的容器以及如何使用 CloudFormation 来创建监控计划。

**Topics**
+ [自定义监控时间表](model-monitor-custom-monitoring-schedules.md)
+ [使用 CloudFormation 自定义资源为实时终端节点创建监控计划](model-monitor-cloudformation-monitoring-schedules.md)

# 自定义监控时间表
<a name="model-monitor-custom-monitoring-schedules"></a>

除了使用内置监控机制之外，还可以使用预处理和后处理脚本，或通过使用或构建您自己的容器来创建您自己的自定义监控计划和过程。

**Topics**
+ [预处理和后处理](model-monitor-pre-and-post-processing.md)
+ [使用 Amazon SageMaker 模型监视器支持您自己的容器](model-monitor-byoc-containers.md)

# 预处理和后处理
<a name="model-monitor-pre-and-post-processing"></a>

您可以使用自定义的预处理和后处理 Python 脚本将输入转换为 Model Monitor，或者在成功运行监控后扩展代码。将这些脚本上传到 Amazon S3，并在创建 Model Monitor 时引用它们。

以下示例说明了如何使用预处理和后处理脚本来自定义监控计划。*user placeholder text*用您自己的信息替换。

```
import boto3, os
from sagemaker import get_execution_role, Session
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

# Upload pre and postprocessor scripts
session = Session()
bucket = boto3.Session().resource("s3").Bucket(session.default_bucket())
prefix = "demo-sagemaker-model-monitor"
pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py")
post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py")

# Get execution role
role = get_execution_role() # can be an empty string

# Instance type
instance_type = "instance-type"
# instance_type = "ml.m5.xlarge" # Example

# Create a monitoring schedule with pre and postprocessing
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type=instance_type,
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

s3_report_path = "s3://{}/{}".format(bucket, "reports")
monitor_schedule_name = "monitor-schedule-name"
endpoint_name = "endpoint-name"
my_default_monitor.create_monitoring_schedule(
    post_analytics_processor_script=post_processor_script,
    record_preprocessor_script=pre_processor_script,
    monitor_schedule_name=monitor_schedule_name,
    # use endpoint_input for real-time endpoint
    endpoint_input=endpoint_name,
    # or use batch_transform_input for batch transform jobs
    # batch_transform_input=batch_transform_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)
```

**Topics**
+ [预处理脚本](#model-monitor-pre-processing-script)
+ [自定义采样](#model-monitor-pre-processing-custom-sampling)
+ [后处理脚本](#model-monitor-post-processing-script)

## 预处理脚本
<a name="model-monitor-pre-processing-script"></a>

当需要将输入转换为 Model Monitor 时，请使用预处理脚本。

例如，假设模型的输出是一个数组 `[1.0, 2.1]`。Amazon SageMaker 模型监视器容器仅适用于表格或扁平化的 JSON 结构，例如。`{“prediction0”: 1.0, “prediction1” : 2.1}`您可以使用如下所示的预处理脚本将数组转换为正确的 JSON 结构。

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = inference_record.endpoint_output.data.rstrip("\n")
    data = output_data + "," + input_data
    return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
```

在另一个示例中，假设您的模型具有可选特征，并且您使用 `-1` 来表示该可选特征有缺失值。如果您有数据质量监控器，则可能需要从输入值数组中删除 `-1`，使其不包含在监控器的指标计算中。您可以使用如下所示的脚本来删除这些值。

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

您的预处理脚本接收 `inference_record` 作为其唯一输入。下面的代码片段显示了 `inference_record` 示例。

```
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.01076381653547287",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9",
    "inferenceTime": "2019-11-20T23:33:12Z"
  },
  "eventVersion": "0"
}
```

以下代码片段显示了 `inference_record` 的完整类结构。

```
KEY_EVENT_METADATA = "eventMetadata"
KEY_EVENT_METADATA_EVENT_ID = "eventId"
KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime"
KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes"

KEY_EVENTDATA_ENCODING = "encoding"
KEY_EVENTDATA_DATA = "data"

KEY_GROUND_TRUTH_DATA = "groundTruthData"

KEY_EVENTDATA = "captureData"
KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput"
KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput"

KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput"
KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType"
KEY_EVENTDATA_MODE = "mode"

KEY_EVENT_VERSION = "eventVersion"

class EventConfig:
    def __init__(self, endpoint, variant, start_time, end_time):
        self.endpoint = endpoint
        self.variant = variant
        self.start_time = start_time
        self.end_time = end_time


class EventMetadata:
    def __init__(self, event_metadata_dict):
        self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None)
        self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None)
        self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None)


class EventData:
    def __init__(self, data_dict):
        self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None)
        self.data = data_dict.get(KEY_EVENTDATA_DATA, None)
        self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None)
        self.mode = data_dict.get(KEY_EVENTDATA_MODE, None)

    def as_dict(self):
        ret = {
            KEY_EVENTDATA_ENCODING: self.encoding,
            KEY_EVENTDATA_DATA: self.data,
            KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType,
        }
        return ret


class CapturedData:
    def __init__(self, event_dict):
        self.event_metadata = None
        self.endpoint_input = None
        self.endpoint_output = None
        self.batch_transform_output = None
        self.ground_truth = None
        self.event_version = None
        self.event_dict = event_dict
        self._event_dict_postprocessed = False
        
        if KEY_EVENT_METADATA in event_dict:
            self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA])
        if KEY_EVENTDATA in event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT])
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT])
            if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT])

        if KEY_GROUND_TRUTH_DATA in event_dict:
            self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA])
        if KEY_EVENT_VERSION in event_dict:
            self.event_version = event_dict[KEY_EVENT_VERSION]

    def as_dict(self):
        if self._event_dict_postprocessed is True:
            return self.event_dict
        if KEY_EVENTDATA in self.event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict()
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][
                    KEY_EVENTDATA_ENDPOINT_OUTPUT
                ] = self.endpoint_output.as_dict()
            if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict()
        
        self._event_dict_postprocessed = True
        return self.event_dict

    def __str__(self):
        return str(self.as_dict())
```

## 自定义采样
<a name="model-monitor-pre-processing-custom-sampling"></a>

您也可以在预处理脚本中应用自定义采样策略。为此，请将 Model Monitor 的第一方预构建容器配置为根据您指定的采样率忽略一定比例的记录。在以下示例中，处理程序通过在 10% 的处理程序调用中返回记录，否则返回空列表，从而对 10% 的记录进行采样。

```
import random

def preprocess_handler(inference_record):
    # we set up a sampling rate of 0.1
    if random.random() > 0.1:
        # return an empty list
        return []
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

### 预处理脚本的自定义日志记录
<a name="model-monitor-pre-processing-custom-logging"></a>

 如果您的预处理脚本返回错误，请检查记录 CloudWatch 到调试的异常消息。您可以 CloudWatch 通过`preprocess_handler`界面访问记录器。您可以将脚本中所需的任何信息记录到 CloudWatch。这在调试预处理脚本时非常有用。以下示例显示了如何使用`preprocess_handler`界面登录到 CloudWatch 

```
def preprocess_handler(inference_record, logger):
    logger.info(f"I'm a processing record: {inference_record}")
    logger.debug(f"I'm debugging a processing record: {inference_record}")
    logger.warning(f"I'm processing record with missing value: {inference_record}")
    logger.error(f"I'm a processing record with bad value: {inference_record}")
    return inference_record
```

## 后处理脚本
<a name="model-monitor-post-processing-script"></a>

如果要在成功运行监控后扩展代码，请使用后处理脚本。

```
def postprocess_handler():
    print("Hello from post-proc script!")
```

# 使用 Amazon SageMaker 模型监视器支持您自己的容器
<a name="model-monitor-byoc-containers"></a>

Amazon SageMaker Model Monitor 提供了一个预建容器，能够分析从终端节点捕获的数据或表格数据集的批量转换任务。如果要自带容器，Model Monitor 为您提供了可利用的扩展点。

在后台，当您创建 `MonitoringSchedule` 时，Model Monitor 最终将启动处理作业。因此，容器需要了解[如何构建您自己的处理容器（高级方案）](build-your-own-processing-container.md)主题中记录的处理作业约定。请注意，Model Monitor 将按照计划代表您启动处理作业。在调用时，Model Monitor 会为您设置额外的环境变量，以便您的容器具有足够的上下文来处理已计划监控的特定执行的数据。有关容器输入的其他信息，请参阅 [容器约定输入](model-monitor-byoc-contract-inputs.md)。

在容器中，通过使用上述环境变量/上下文，您现在可以在自定义代码中分析当前周期的数据集。在此分析完成后，您可以选择发出要上传到 S3 存储桶的报告。预构建容器所生成的报告将记录在[容器约定输出](model-monitor-byoc-contract-outputs.md)中。如果您想在 SageMaker Studio 中实现报表的可视化，则应遵循相同的格式。还可以选择发出完全自定义的报告。

您还可以按照中的说明从容器中[CloudWatch 自带容器的指标](model-monitor-byoc-cloudwatch.md)发布 CloudWatch 指标。

**Topics**
+ [容器约定输入](model-monitor-byoc-contract-inputs.md)
+ [容器约定输出](model-monitor-byoc-contract-outputs.md)
+ [CloudWatch 自带容器的指标](model-monitor-byoc-cloudwatch.md)

# 容器约定输入
<a name="model-monitor-byoc-contract-inputs"></a>

Amazon SageMaker 模型监控器平台会根据指定的计划调用您的容器代码。如果您选择编写您自己的容器代码，则可以使用以下环境变量。在此上下文中，可以分析当前数据集或评估约束（如果您选择这样做）并发出指标（如果适用）。

除了 `dataset_format` 变量之外，实时端点和批量转换作业的可用环境变量相同。如果您使用的是实时端点，则 `dataset_format` 变量支持以下选项：

```
{\"sagemakerCaptureJson\": {\"captureIndexNames\": [\"endpointInput\",\"endpointOutput\"]}}
```

如果您使用的是批量转换作业，则 `dataset_format` 支持以下选项：

```
{\"csv\": {\"header\": [\"true\",\"false\"]}}
```

```
{\"json\": {\"line\": [\"true\",\"false\"]}}
```

```
{\"parquet\": {}}
```

以下代码示例显示了可用于容器代码的整套环境变量（并对实时端点使用 `dataset_format` 格式）。

```
"Environment": {
 "dataset_format": "{\"sagemakerCaptureJson\": {\"captureIndexNames\": [\"endpointInput\",\"endpointOutput\"]}}",
 "dataset_source": "/opt/ml/processing/endpointdata",
 "end_time": "2019-12-01T16: 20: 00Z",
 "output_path": "/opt/ml/processing/resultdata",
 "publish_cloudwatch_metrics": "Disabled",
 "sagemaker_endpoint_name": "endpoint-name",
 "sagemaker_monitoring_schedule_name": "schedule-name",
 "start_time": "2019-12-01T15: 20: 00Z"
}
```

参数 


| 参数名称 | 说明 | 
| --- | --- | 
| dataset\$1format |  对于从由 `Endpoint` 支持的 `MonitoringSchedule` 开始的作业，这是具有捕获索引 `endpointInput` 和/或 `endpointOutput` 的 `sageMakerCaptureJson`。对于批量转换作业，这指定了数据格式，无论是 CSV、JSON 还是 Parquet。  | 
| dataset\$1source |  如果您使用的是实时端点，则提供与由 `start_time` 和 `end_time` 指定的监控周期对应的数据所在的本地路径。在此路径中，数据在 ` /{endpoint-name}/{variant-name}/yyyy/mm/dd/hh` 中可用。 有时，我们下载的内容超出了开始时间和结束时间所指定的范围。由容器代码根据需要决定解析数据。  | 
| output\$1path |  用于写入输出报告和其他文件的本地路径。您必须在 `CreateMonitoringSchedule` 请求中将该参数指定为 `MonitoringOutputConfig.MonitoringOutput[0].LocalPath`。它将上传到 `MonitoringOutputConfig.MonitoringOutput[0].S3Uri` 中指定的 `S3Uri` 路径。  | 
| publish\$1cloudwatch\$1metrics |  对于由 `CreateMonitoringSchedule` 启动的作业，此参数设置为 `Enabled`。容器可以选择将 Amazon CloudWatch 输出文件写入到`[filepath]`。  | 
| sagemaker\$1endpoint\$1name |  如果您使用的是实时端点，则是为其启动此计划作业的 `Endpoint` 的名称。  | 
| sagemaker\$1monitoring\$1schedule\$1name |  启动了此作业的 `MonitoringSchedule` 的名称。  | 
| \$1sagemaker\$1endpoint\$1datacapture\$1prefix\$1 |  如果您使用的是实时端点，则是 `Endpoint` 的 `DataCaptureConfig` 参数中指定的前缀。如果容器需要直接访问的数据超过 SageMaker AI 在`dataset_source`路径上已下载的数据，则可以使用此功能。  | 
| start\$1time, end\$1time |  此分析的运行时段。例如，对于计划在 05:00 UTC 运行的作业和在 20/02/2020 运行的作业，`start_time` 为 2020-02-19T06:00:00Z，`end_time` 为 2020-02-20T05:00:00Z  | 
| baseline\$1constraints: |  ` BaselineConfig.ConstraintResource.S3Uri` 中指定的基准约束文件的本地路径。这仅在 `CreateMonitoringSchedule` 请求中指定了此参数时可用。  | 
| baseline\$1statistics |  `BaselineConfig.StatisticsResource.S3Uri` 中指定的基准统计数据文件的本地路径。这仅在 `CreateMonitoringSchedule` 请求中指定了此参数时可用。  | 

# 容器约定输出
<a name="model-monitor-byoc-contract-outputs"></a>

容器可以分析 `*dataset_source*` 路径中可用的数据，并将报告写入 `*output_path*.` 中的路径。容器代码可以编写任何报告来满足您的需求。

如果您使用以下结构和合约，AI 会在可视化和 AP SageMaker I 中对某些输出文件进行特殊处理。这仅适用于表格数据集。

表格数据集的输出文件


| 文件名称 | 说明 | 
| --- | --- | 
| statistics.json |  此文件应具有所分析数据集中每个特征的列式统计数据。下一节将介绍此文件的架构。  | 
| constraints.json |  此文件应对观察到的特征有约束。下一节将介绍此文件的架构。  | 
| constraints\$1violations.json |  此文件应包含在当前数据集中找到的相对于 `baseline_statistics` 路径中指定的基准统计数据文件和 `baseline_constaints` 路径中指定的约束文件的违规情况的列表。  | 

此外，如果`publish_cloudwatch_metrics`值为`"Enabled"`容器代码，则可以在此位置发出Amazon CloudWatch 指标：`/opt/ml/output/metrics/cloudwatch`。以下部分中描述了这些文件的架构。

**Topics**
+ [统计数据的架构（statistics.json 文件）](model-monitor-byoc-statistics.md)
+ [约束的架构（constraints.json 文件）](model-monitor-byoc-constraints.md)

# 统计数据的架构（statistics.json 文件）
<a name="model-monitor-byoc-statistics"></a>

`statistics.json` 文件中定义的架构指定要为基准和捕获的数据计算的统计参数。它还将存储桶配置为由 [KLL](https://datasketches.apache.org/docs/KLL/KLLSketch.html)（一个带有延迟压缩方案的非常紧凑的分位数草图）使用。

```
{
    "version": 0,
    # dataset level stats
    "dataset": {
        "item_count": number
    },
    # feature level stats
    "features": [
        {
            "name": "feature-name",
            "inferred_type": "Fractional" | "Integral",
            "numerical_statistics": {
                "common": {
                    "num_present": number,
                    "num_missing": number
                },
                "mean": number,
                "sum": number,
                "std_dev": number,
                "min": number,
                "max": number,
                "distribution": {
                    "kll": {
                        "buckets": [
                            {
                                "lower_bound": number,
                                "upper_bound": number,
                                "count": number
                            }
                        ],
                        "sketch": {
                            "parameters": {
                                "c": number,
                                "k": number
                            },
                            "data": [
                                [
                                    num,
                                    num,
                                    num,
                                    num
                                ],
                                [
                                    num,
                                    num
                                ][
                                    num,
                                    num
                                ]
                            ]
                        }#sketch
                    }#KLL
                }#distribution
            }#num_stats
        },
        {
            "name": "feature-name",
            "inferred_type": "String",
            "string_statistics": {
                "common": {
                    "num_present": number,
                    "num_missing": number
                },
                "distinct_count": number,
                "distribution": {
                    "categorical": {
                         "buckets": [
                                {
                                    "value": "string",
                                    "count": number
                                }
                          ]
                     }
                }
            },
            #provision for custom stats
        }
    ]
}
```

**注意**  
 SageMaker AI 将在以后的可视化更改中识别指定的指标。如果需要，容器可以发出更多的指标。
[KLL 草图](https://datasketches.apache.org/docs/KLL/KLLSketch.html)是公认的草图。自定义容器可以编写自己的表示形式，但是 SageMaker AI 在可视化中无法识别它。
默认情况下，分配将具体化到 10 个存储桶中。无法更改此设置。

# 约束的架构（constraints.json 文件）
<a name="model-monitor-byoc-constraints"></a>

constraints.json 文件用于表达数据集必须满足的约束条件。Amazon SageMaker 模型监控器容器可以使用 constraints.json 文件来评估数据集。利用预构建的容器，可以为基准数据集自动生成 constraints.json 文件。如果您创建了自己的容器，则可以为它提供类似的功能，也可以通过其他方式创建 constraints.json 文件。以下是预构建的容器使用的约束文件的架构。自带容器可以采用相同的格式或根据需要对其进行增强。

```
{
    "version": 0,
    "features":
    [
        {
            "name": "string",
            "inferred_type": "Integral" | "Fractional" | 
                    | "String" | "Unknown",
            "completeness": number,
            "num_constraints":
            {
                "is_non_negative": boolean
            },
            "string_constraints":
            {
                "domains":
                [
                    "list of",
                    "observed values",
                    "for small cardinality"
                ]
            },
            "monitoringConfigOverrides":
            {}
        }
    ],
    "monitoring_config":
    {
        "evaluate_constraints": "Enabled",
        "emit_metrics": "Enabled",
        "datatype_check_threshold": 0.1,
        "domain_content_threshold": 0.1,
        "distribution_constraints":
        {
            "perform_comparison": "Enabled",
            "comparison_threshold": 0.1,
            "comparison_method": "Simple"||"Robust",
            "categorical_comparison_threshold": 0.1,
            "categorical_drift_method": "LInfinity"||"ChiSquared"
        }
    }
}
```

`monitoring_config` 对象包含用于该特征监控作业的选项。下表描述了每个选项。

监控约束

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/model-monitor-byoc-constraints.html)

# CloudWatch 自带容器的指标
<a name="model-monitor-byoc-cloudwatch"></a>

如果该`publish_cloudwatch_metrics`值`Enabled`在`/opt/ml/processing/processingjobconfig.json`文件`Environment`的地图中，则容器代码会在以下位置发出亚马逊 CloudWatch 指标：`/opt/ml/output/metrics/cloudwatch`。

此文件的架构紧密基于 CloudWatch `PutMetrics` API。此处未指定命名空间。它默认为以下内容：
+ `For real-time endpoints: /aws/sagemaker/Endpoint/data-metrics`
+ `For batch transform jobs: /aws/sagemaker/ModelMonitoring/data-metrics`

不过，您可以指定维度。建议您至少添加以下维度：
+ `Endpoint` 和 `MonitoringSchedule`（对于实时端点）
+ `MonitoringSchedule`（对于批量转换作业）

以下 JSON 片段展示了如何设置维度。

对于实时端点，请参阅以下 JSON 片段，其中包含 `Endpoint` 和 `MonitoringSchedule` 维度：

```
{ 
    "MetricName": "", # Required
    "Timestamp": "2019-11-26T03:00:00Z", # Required
    "Dimensions" : [{"Name":"Endpoint","Value":"endpoint_0"},{"Name":"MonitoringSchedule","Value":"schedule_0"}]
    "Value": Float,
    # Either the Value or the StatisticValues field can be populated and not both.
    "StatisticValues": {
        "SampleCount": Float,
        "Sum": Float,
        "Minimum": Float,
        "Maximum": Float
    },
    "Unit": "Count", # Optional
}
```

对于批量转换作业，请参阅以下 JSON 片段，其中包含 `MonitoringSchedule` 维度：

```
{ 
    "MetricName": "", # Required
    "Timestamp": "2019-11-26T03:00:00Z", # Required
    "Dimensions" : [{"Name":"MonitoringSchedule","Value":"schedule_0"}]
    "Value": Float,
    # Either the Value or the StatisticValues field can be populated and not both.
    "StatisticValues": {
        "SampleCount": Float,
        "Sum": Float,
        "Minimum": Float,
        "Maximum": Float
    },
    "Unit": "Count", # Optional
}
```

# 使用 CloudFormation 自定义资源为实时终端节点创建监控计划
<a name="model-monitor-cloudformation-monitoring-schedules"></a>

如果您使用的是实时终端节点，则可以使用 CloudFormation 自定义资源来创建监控计划。自定义资源位于 Python 中。要部署它，请参阅 [Python Lambda 部署](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html)。

## 自定义资源
<a name="model-monitor-cloudformation-custom-resource"></a>

首先向 CloudFormation 模板添加自定义资源。这指向您下一步将创建的 AWS Lambda 函数。

此资源使您可以自定义监控计划的参数。您可以通过修改以下示例资源中的 CloudFormation 资源和 Lambda 函数来添加或删除更多参数。

```
{
    "AWSTemplateFormatVersion": "2010-09-09",
    "Resources": {
        "MonitoringSchedule": {
            "Type": "Custom::MonitoringSchedule",
            "Version": "1.0",
            "Properties": {
                "ServiceToken": "arn:aws:lambda:us-west-2:111111111111:function:lambda-name",
                "ScheduleName": "YourScheduleName",
                "EndpointName": "YourEndpointName",
                "BaselineConstraintsUri": "s3://your-baseline-constraints/constraints.json",
                "BaselineStatisticsUri": "s3://your-baseline-stats/statistics.json",
                "PostAnalyticsProcessorSourceUri": "s3://your-post-processor/postprocessor.py",
                "RecordPreprocessorSourceUri": "s3://your-preprocessor/preprocessor.py",
                "InputLocalPath": "/opt/ml/processing/endpointdata",
                "OutputLocalPath": "/opt/ml/processing/localpath",
                "OutputS3URI": "s3://your-output-uri",
                "ImageURI": "111111111111.dkr.ecr.us-west-2.amazonaws.com/your-image",
                "ScheduleExpression": "cron(0 * ? * * *)",
                "PassRoleArn": "arn:aws:iam::111111111111:role/AmazonSageMaker-ExecutionRole"
            }
        }
    }
}
```

## Lambda 自定义资源代码
<a name="model-monitor-cloudformation-lambda-custom-resource-code"></a>

此 CloudFormation 自定义资源使用[自定义资源助手](https://github.com/aws-cloudformation/custom-resource-helper) AWS 库，您可以使用 `pip install crhelper` pip 安装该库。

此 Lambda 函数由 CloudFormation 在创建和删除堆栈期间调用。此 Lambda 函数负责创建和删除监控计划，并使用上一部分中描述的自定义资源中定义的参数。

```
import boto3
import botocore
import logging

from crhelper import CfnResource
from botocore.exceptions import ClientError


logger = logging.getLogger(__name__)
sm = boto3.client('sagemaker')

# cfnhelper makes it easier to implement a CloudFormation custom resource
helper = CfnResource()

# CFN Handlers

def handler(event, context):
    helper(event, context)


@helper.create
def create_handler(event, context):
    """
    Called when CloudFormation custom resource sends the create event
    """
    create_monitoring_schedule(event)


@helper.delete
def delete_handler(event, context):
    """
    Called when CloudFormation custom resource sends the delete event
    """
    schedule_name = get_schedule_name(event)
    delete_monitoring_schedule(schedule_name)


@helper.poll_create
def poll_create(event, context):
    """
    Return true if the resource has been created and false otherwise so
    CloudFormation polls again.
    """
    schedule_name = get_schedule_name(event)
    logger.info('Polling for creation of schedule: %s', schedule_name)
    return is_schedule_ready(schedule_name)

@helper.update
def noop():
    """
    Not currently implemented but crhelper will throw an error if it isn't added
    """
    pass

# Helper Functions

def get_schedule_name(event):
    return event['ResourceProperties']['ScheduleName']

def create_monitoring_schedule(event):
    schedule_name = get_schedule_name(event)
    monitoring_schedule_config = create_monitoring_schedule_config(event)

    logger.info('Creating monitoring schedule with name: %s', schedule_name)

    sm.create_monitoring_schedule(
        MonitoringScheduleName=schedule_name,
        MonitoringScheduleConfig=monitoring_schedule_config)

def is_schedule_ready(schedule_name):
    is_ready = False

    schedule = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
    status = schedule['MonitoringScheduleStatus']

    if status == 'Scheduled':
        logger.info('Monitoring schedule (%s) is ready', schedule_name)
        is_ready = True
    elif status == 'Pending':
        logger.info('Monitoring schedule (%s) still creating, waiting and polling again...', schedule_name)
    else:
        raise Exception('Monitoring schedule ({}) has unexpected status: {}'.format(schedule_name, status))

    return is_ready

def create_monitoring_schedule_config(event):
    props = event['ResourceProperties']

    return {
        "ScheduleConfig": {
            "ScheduleExpression": props["ScheduleExpression"],
        },
        "MonitoringJobDefinition": {
            "BaselineConfig": {
                "ConstraintsResource": {
                    "S3Uri": props['BaselineConstraintsUri'],
                },
                "StatisticsResource": {
                    "S3Uri": props['BaselineStatisticsUri'],
                }
            },
            "MonitoringInputs": [
                {
                    "EndpointInput": {
                        "EndpointName": props["EndpointName"],
                        "LocalPath": props["InputLocalPath"],
                    }
                }
            ],
            "MonitoringOutputConfig": {
                "MonitoringOutputs": [
                    {
                        "S3Output": {
                            "S3Uri": props["OutputS3URI"],
                            "LocalPath": props["OutputLocalPath"],
                        }
                    }
                ],
            },
            "MonitoringResources": {
                "ClusterConfig": {
                    "InstanceCount": 1,
                    "InstanceType": "ml.t3.medium",
                    "VolumeSizeInGB": 50,
                }
            },
            "MonitoringAppSpecification": {
                "ImageUri": props["ImageURI"],
                "RecordPreprocessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
                "PostAnalyticsProcessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
            },
            "StoppingCondition": {
                "MaxRuntimeInSeconds": 300
            },
            "RoleArn": props["PassRoleArn"],
        }
    }


def delete_monitoring_schedule(schedule_name):
    logger.info('Deleting schedule: %s', schedule_name)
    try:
        sm.delete_monitoring_schedule(MonitoringScheduleName=schedule_name)
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFound':
            logger.info('Resource not found, nothing to delete')
        else:
            logger.error('Unexpected error while trying to delete monitoring schedule')
            raise e
```