

# Advanced topics
<a name="model-monitor-advanced-topics"></a>

The following sections contain more advanced tasks that explain how to customize monitoring using preprocessing and postprocessing scripts, how to build your own container, and how to use CloudFormation to create a monitoring schedule.

**Topics**
+ [Custom monitoring schedules](model-monitor-custom-monitoring-schedules.md)
+ [Create a Monitoring Schedule for a Real-time Endpoint with an CloudFormation Custom Resource](model-monitor-cloudformation-monitoring-schedules.md)

# Custom monitoring schedules
<a name="model-monitor-custom-monitoring-schedules"></a>

In addition to using the built-in monitoring mechanisms, you can create your own custom monitoring schedules and procedures using preprocessing and postprocessing scripts or by using or building your own container.

**Topics**
+ [Preprocessing and Postprocessing](model-monitor-pre-and-post-processing.md)
+ [Support for Your Own Containers With Amazon SageMaker Model Monitor](model-monitor-byoc-containers.md)

# Preprocessing and Postprocessing
<a name="model-monitor-pre-and-post-processing"></a>

You can use custom preprocessing and postprocessing Python scripts to transform the input to your model monitor or extend the code after a successful monitoring run. Upload these scripts to Amazon S3 and reference them when creating your model monitor.

The following example shows how you can customize monitoring schedules with preprocessing and postprocessing scripts. Replace *user placeholder text* with your own information.

```
import boto3, os
from sagemaker import get_execution_role, Session
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

# Upload pre and postprocessor scripts
session = Session()
bucket = boto3.Session().resource("s3").Bucket(session.default_bucket())
prefix = "demo-sagemaker-model-monitor"
pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py")
post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py")

# Get execution role
role = get_execution_role() # can be an empty string

# Instance type
instance_type = "instance-type"
# instance_type = "ml.m5.xlarge" # Example

# Create a monitoring schedule with pre and postprocessing
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type=instance_type,
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

s3_report_path = "s3://{}/{}".format(bucket, "reports")
monitor_schedule_name = "monitor-schedule-name"
endpoint_name = "endpoint-name"
my_default_monitor.create_monitoring_schedule(
    post_analytics_processor_script=post_processor_script,
    record_preprocessor_script=pre_processor_script,
    monitor_schedule_name=monitor_schedule_name,
    # use endpoint_input for real-time endpoint
    endpoint_input=endpoint_name,
    # or use batch_transform_input for batch transform jobs
    # batch_transform_input=batch_transform_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)
```

**Topics**
+ [Preprocessing Script](#model-monitor-pre-processing-script)
+ [Custom Sampling](#model-monitor-pre-processing-custom-sampling)
+ [Postprocessing Script](#model-monitor-post-processing-script)

## Preprocessing Script
<a name="model-monitor-pre-processing-script"></a>

Use preprocessing scripts when you need to transform the inputs to your model monitor.

For example, suppose the output of your model is an array `[1.0, 2.1]`. The Amazon SageMaker Model Monitor container only works with tabular or flattened JSON structures, like `{“prediction0”: 1.0, “prediction1” : 2.1}`. You could use a preprocessing script like the following to transform the array into the correct JSON structure.

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = inference_record.endpoint_output.data.rstrip("\n")
    data = output_data + "," + input_data
    return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
```

In another example, suppose your model has optional features and you use `-1` to denote that the optional feature has a missing value. If you have a data quality monitor, you may want to remove the `-1` from the input value array so that it isn't included in the monitor's metric calculations. You could use a script like the following to remove those values.

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

Your preprocessing script receives an `inference_record` as its only input. The following code snippet shows an example of an `inference_record`.

```
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.01076381653547287",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9",
    "inferenceTime": "2019-11-20T23:33:12Z"
  },
  "eventVersion": "0"
}
```

The following code snippet shows the full class structure for an `inference_record`.

```
KEY_EVENT_METADATA = "eventMetadata"
KEY_EVENT_METADATA_EVENT_ID = "eventId"
KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime"
KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes"

KEY_EVENTDATA_ENCODING = "encoding"
KEY_EVENTDATA_DATA = "data"

KEY_GROUND_TRUTH_DATA = "groundTruthData"

KEY_EVENTDATA = "captureData"
KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput"
KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput"

KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput"
KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType"
KEY_EVENTDATA_MODE = "mode"

KEY_EVENT_VERSION = "eventVersion"

class EventConfig:
    def __init__(self, endpoint, variant, start_time, end_time):
        self.endpoint = endpoint
        self.variant = variant
        self.start_time = start_time
        self.end_time = end_time


class EventMetadata:
    def __init__(self, event_metadata_dict):
        self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None)
        self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None)
        self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None)


class EventData:
    def __init__(self, data_dict):
        self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None)
        self.data = data_dict.get(KEY_EVENTDATA_DATA, None)
        self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None)
        self.mode = data_dict.get(KEY_EVENTDATA_MODE, None)

    def as_dict(self):
        ret = {
            KEY_EVENTDATA_ENCODING: self.encoding,
            KEY_EVENTDATA_DATA: self.data,
            KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType,
        }
        return ret


class CapturedData:
    def __init__(self, event_dict):
        self.event_metadata = None
        self.endpoint_input = None
        self.endpoint_output = None
        self.batch_transform_output = None
        self.ground_truth = None
        self.event_version = None
        self.event_dict = event_dict
        self._event_dict_postprocessed = False
        
        if KEY_EVENT_METADATA in event_dict:
            self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA])
        if KEY_EVENTDATA in event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT])
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT])
            if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT])

        if KEY_GROUND_TRUTH_DATA in event_dict:
            self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA])
        if KEY_EVENT_VERSION in event_dict:
            self.event_version = event_dict[KEY_EVENT_VERSION]

    def as_dict(self):
        if self._event_dict_postprocessed is True:
            return self.event_dict
        if KEY_EVENTDATA in self.event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict()
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][
                    KEY_EVENTDATA_ENDPOINT_OUTPUT
                ] = self.endpoint_output.as_dict()
            if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict()
        
        self._event_dict_postprocessed = True
        return self.event_dict

    def __str__(self):
        return str(self.as_dict())
```

## Custom Sampling
<a name="model-monitor-pre-processing-custom-sampling"></a>

You can also apply a custom sampling strategy in your preprocessing script. To do this, configure Model Monitor's first-party, pre-built container to ignore a percentage of the records according to your specified sampling rate. In the following example, the handler samples 10 percent of the records by returning the record in 10 percent of handler calls and an empty list otherwise.

```
import random

def preprocess_handler(inference_record):
    # we set up a sampling rate of 0.1
    if random.random() > 0.1:
        # return an empty list
        return []
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

### Custom logging for preprocessing script
<a name="model-monitor-pre-processing-custom-logging"></a>

 If your preprocessing script returns an error, check the exception messages logged to CloudWatch to debug. You can access the logger on CloudWatch through the `preprocess_handler` interface. You can log any information you need from your script to CloudWatch. This can be useful when debug your preprocessing script. The following example shows how you can use the `preprocess_handler` interface to log to CloudWatch 

```
def preprocess_handler(inference_record, logger):
    logger.info(f"I'm a processing record: {inference_record}")
    logger.debug(f"I'm debugging a processing record: {inference_record}")
    logger.warning(f"I'm processing record with missing value: {inference_record}")
    logger.error(f"I'm a processing record with bad value: {inference_record}")
    return inference_record
```

## Postprocessing Script
<a name="model-monitor-post-processing-script"></a>

Use a postprocessing script when you want to extend the code following a successful monitoring run.

```
def postprocess_handler():
    print("Hello from post-proc script!")
```

# Support for Your Own Containers With Amazon SageMaker Model Monitor
<a name="model-monitor-byoc-containers"></a>

Amazon SageMaker Model Monitor provides a prebuilt container with ability to analyze the data captured from endpoints or batch transform jobs for tabular datasets. If you would like to bring your own container, Model Monitor provides extension points which you can leverage.

Under the hood, when you create a `MonitoringSchedule`, Model Monitor ultimately kicks off processing jobs. Hence the container needs to be aware of the processing job contract documented in the [How to Build Your Own Processing Container (Advanced Scenario)](build-your-own-processing-container.md) topic. Note that Model Monitor kicks off the processing job on your behalf per the schedule. While invoking, Model Monitor sets up additional environment variables for you so that your container has enough context to process the data for that particular execution of the scheduled monitoring. For additional information on container inputs, see the [Container Contract Inputs](model-monitor-byoc-contract-inputs.md).

In the container, using the above environment variables/context, you can now analyze the dataset for the current period in your custom code. After this analysis is complete, you can chose to emit your reports to be uploaded to an S3 bucket. The reports that the prebuilt container generates are documented in [Container Contract Outputs](model-monitor-byoc-contract-outputs.md). If you would like the visualization of the reports to work in SageMaker Studio, you should follow the same format. You can also choose to emit completely custom reports.

You also emit CloudWatch metrics from the container by following the instructions in [CloudWatch Metrics for Bring Your Own Containers](model-monitor-byoc-cloudwatch.md).

**Topics**
+ [Container Contract Inputs](model-monitor-byoc-contract-inputs.md)
+ [Container Contract Outputs](model-monitor-byoc-contract-outputs.md)
+ [CloudWatch Metrics for Bring Your Own Containers](model-monitor-byoc-cloudwatch.md)

# Container Contract Inputs
<a name="model-monitor-byoc-contract-inputs"></a>

The Amazon SageMaker Model Monitor platform invokes your container code according to a specified schedule. If you choose to write your own container code, the following environment variables are available. In this context, you can analyze the current dataset or evaluate the constraints if you choose and emit metrics, if applicable.

The available environment variables are the same for real-time endpoints and batch transform jobs, except for the `dataset_format` variable. If you are using a real-time endpoint, the `dataset_format` variable supports the following options:

```
{\"sagemakerCaptureJson\": {\"captureIndexNames\": [\"endpointInput\",\"endpointOutput\"]}}
```

If you are using a batch transform job, the `dataset_format` supports the following options:

```
{\"csv\": {\"header\": [\"true\",\"false\"]}}
```

```
{\"json\": {\"line\": [\"true\",\"false\"]}}
```

```
{\"parquet\": {}}
```

The following code sample shows the complete set of environment variables available for your container code (and uses the `dataset_format` format for a real-time endpoint).

```
"Environment": {
 "dataset_format": "{\"sagemakerCaptureJson\": {\"captureIndexNames\": [\"endpointInput\",\"endpointOutput\"]}}",
 "dataset_source": "/opt/ml/processing/endpointdata",
 "end_time": "2019-12-01T16: 20: 00Z",
 "output_path": "/opt/ml/processing/resultdata",
 "publish_cloudwatch_metrics": "Disabled",
 "sagemaker_endpoint_name": "endpoint-name",
 "sagemaker_monitoring_schedule_name": "schedule-name",
 "start_time": "2019-12-01T15: 20: 00Z"
}
```

Parameters 


| Parameter Name | Description | 
| --- | --- | 
| dataset\$1format |  For a job started from a `MonitoringSchedule` backed by an `Endpoint`, this is `sageMakerCaptureJson` with the capture indices `endpointInput`,or `endpointOutput`, or both. For a batch transform job, this specifies the data format, whether CSV, JSON, or Parquet.  | 
| dataset\$1source |  If you are using a real-time endpoint, the local path in which the data corresponding to the monitoring period, as specified by `start_time` and `end_time`, are available. At this path, the data is available in` /{endpoint-name}/{variant-name}/yyyy/mm/dd/hh`. We sometimes download more than what is specified by the start and end times. It is up to the container code to parse the data as required.  | 
| output\$1path |  The local path to write output reports and other files. You specify this parameter in the `CreateMonitoringSchedule` request as `MonitoringOutputConfig.MonitoringOutput[0].LocalPath`. It is uploaded to the `S3Uri` path specified in `MonitoringOutputConfig.MonitoringOutput[0].S3Uri`.  | 
| publish\$1cloudwatch\$1metrics |  For a job launched by `CreateMonitoringSchedule`, this parameter is set to `Enabled`. The container can choose to write the Amazon CloudWatch output file at `[filepath]`.  | 
| sagemaker\$1endpoint\$1name |  If you are using a real-time endpoint, the name of the `Endpoint` that this scheduled job was launched for.  | 
| sagemaker\$1monitoring\$1schedule\$1name |  The name of the `MonitoringSchedule` that launched this job.  | 
| \$1sagemaker\$1endpoint\$1datacapture\$1prefix\$1 |  If you are using a real-time endpoint, the prefix specified in the `DataCaptureConfig` parameter of the `Endpoint`. The container can use this if it needs to directly access more data than already downloaded by SageMaker AI at the `dataset_source` path.  | 
| start\$1time, end\$1time |  The time window for this analysis run. For example, for a job scheduled to run at 05:00 UTC and a job that runs on 20/02/2020, `start_time`: is 2020-02-19T06:00:00Z and `end_time`: is 2020-02-20T05:00:00Z  | 
| baseline\$1constraints: |  The local path of the baseline constraint file specified in` BaselineConfig.ConstraintResource.S3Uri`. This is available only if this parameter was specified in the `CreateMonitoringSchedule` request.  | 
| baseline\$1statistics |  The local path to the baseline statistics file specified in `BaselineConfig.StatisticsResource.S3Uri`. This is available only if this parameter was specified in the `CreateMonitoringSchedule` request.:   | 

# Container Contract Outputs
<a name="model-monitor-byoc-contract-outputs"></a>

The container can analyze the data available in the `*dataset_source*` path and write reports to the path in `*output_path*.` The container code can write any reports that suit your needs.

If you use the following structure and contract, certain output files are treated specially by SageMaker AI in the visualization and API . This applies only to tabular datasets.

Output Files for Tabular Datasets


| File Name | Description | 
| --- | --- | 
| statistics.json |  This file is expected to have columnar statistics for each feature in the dataset that is analyzed. The schema for this file is available in the next section.  | 
| constraints.json |  This file is expected to have the constraints on the features observed. The schema for this file is available in the next section.  | 
| constraints\$1violations.json |  This file is expected to have the list of violations found in this current set of data as compared to the baseline statistics and constraints file specified in the `baseline_constaints` and `baseline_statistics` path.  | 

In addition, if the `publish_cloudwatch_metrics` value is `"Enabled"` container code can emit Amazon CloudWatch metrics in this location: `/opt/ml/output/metrics/cloudwatch`. The schema for these files is described in the following sections.

**Topics**
+ [Schema for Statistics (statistics.json file)](model-monitor-byoc-statistics.md)
+ [Schema for Constraints (constraints.json file)](model-monitor-byoc-constraints.md)

# Schema for Statistics (statistics.json file)
<a name="model-monitor-byoc-statistics"></a>

The schema defined in the `statistics.json` file specifies the statistical parameters to be calculated for the baseline and data that is captured. It also configures the bucket to be used by [KLL](https://datasketches.apache.org/docs/KLL/KLLSketch.html), a very compact quantiles sketch with lazy compaction scheme.

```
{
    "version": 0,
    # dataset level stats
    "dataset": {
        "item_count": number
    },
    # feature level stats
    "features": [
        {
            "name": "feature-name",
            "inferred_type": "Fractional" | "Integral",
            "numerical_statistics": {
                "common": {
                    "num_present": number,
                    "num_missing": number
                },
                "mean": number,
                "sum": number,
                "std_dev": number,
                "min": number,
                "max": number,
                "distribution": {
                    "kll": {
                        "buckets": [
                            {
                                "lower_bound": number,
                                "upper_bound": number,
                                "count": number
                            }
                        ],
                        "sketch": {
                            "parameters": {
                                "c": number,
                                "k": number
                            },
                            "data": [
                                [
                                    num,
                                    num,
                                    num,
                                    num
                                ],
                                [
                                    num,
                                    num
                                ][
                                    num,
                                    num
                                ]
                            ]
                        }#sketch
                    }#KLL
                }#distribution
            }#num_stats
        },
        {
            "name": "feature-name",
            "inferred_type": "String",
            "string_statistics": {
                "common": {
                    "num_present": number,
                    "num_missing": number
                },
                "distinct_count": number,
                "distribution": {
                    "categorical": {
                         "buckets": [
                                {
                                    "value": "string",
                                    "count": number
                                }
                          ]
                     }
                }
            },
            #provision for custom stats
        }
    ]
}
```

**Notes**  
The specified metrics are recognized by SageMaker AI in later visualization changes. The container can emit more metrics if required.
[KLL sketch](https://datasketches.apache.org/docs/KLL/KLLSketch.html) is the recognized sketch. Custom containers can write their own representation, but it won’t be recognized by SageMaker AI in visualizations.
By default, the distribution is materialized in 10 buckets. You can't change this.

# Schema for Constraints (constraints.json file)
<a name="model-monitor-byoc-constraints"></a>

A constraints.json file is used to express the constraints that a dataset must satisfy. Amazon SageMaker Model Monitor containers can use the constraints.json file to evaluate datasets against. Prebuilt containers provide the ability to generate the constraints.json file automatically for a baseline dataset. If you bring your own container, you can provide it with similar abilities or you can create the constraints.json file in some other way. Here is the schema for the constraint file that the prebuilt container uses. Bring your own containers can adopt the same format or enhance it as required.

```
{
    "version": 0,
    "features":
    [
        {
            "name": "string",
            "inferred_type": "Integral" | "Fractional" | 
                    | "String" | "Unknown",
            "completeness": number,
            "num_constraints":
            {
                "is_non_negative": boolean
            },
            "string_constraints":
            {
                "domains":
                [
                    "list of",
                    "observed values",
                    "for small cardinality"
                ]
            },
            "monitoringConfigOverrides":
            {}
        }
    ],
    "monitoring_config":
    {
        "evaluate_constraints": "Enabled",
        "emit_metrics": "Enabled",
        "datatype_check_threshold": 0.1,
        "domain_content_threshold": 0.1,
        "distribution_constraints":
        {
            "perform_comparison": "Enabled",
            "comparison_threshold": 0.1,
            "comparison_method": "Simple"||"Robust",
            "categorical_comparison_threshold": 0.1,
            "categorical_drift_method": "LInfinity"||"ChiSquared"
        }
    }
}
```

The `monitoring_config` object contains options for monitoring job for the feature. The following table describes each option.

Monitoring Constraints

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)

# CloudWatch Metrics for Bring Your Own Containers
<a name="model-monitor-byoc-cloudwatch"></a>

If the `publish_cloudwatch_metrics` value is `Enabled` in the `Environment` map in the `/opt/ml/processing/processingjobconfig.json` file, the container code emits Amazon CloudWatch metrics in this location: `/opt/ml/output/metrics/cloudwatch`. 

The schema for this file is closely based on the CloudWatch `PutMetrics` API. The namespace is not specified here. It defaults to the following:
+ `For real-time endpoints: /aws/sagemaker/Endpoint/data-metrics`
+ `For batch transform jobs: /aws/sagemaker/ModelMonitoring/data-metrics`

However, you can specify dimensions. We recommend you add the following dimensions at minimum:
+ `Endpoint` and `MonitoringSchedule` for real-time endpoints
+ `MonitoringSchedule` for batch transform jobs

The following JSON snippets show how to set your dimensions.

For a real-time endpoint, see the following JSON snippet which includes the `Endpoint` and `MonitoringSchedule` dimensions:

```
{ 
    "MetricName": "", # Required
    "Timestamp": "2019-11-26T03:00:00Z", # Required
    "Dimensions" : [{"Name":"Endpoint","Value":"endpoint_0"},{"Name":"MonitoringSchedule","Value":"schedule_0"}]
    "Value": Float,
    # Either the Value or the StatisticValues field can be populated and not both.
    "StatisticValues": {
        "SampleCount": Float,
        "Sum": Float,
        "Minimum": Float,
        "Maximum": Float
    },
    "Unit": "Count", # Optional
}
```

For a batch transform job, see the following JSON snippet which includes the `MonitoringSchedule` dimension:

```
{ 
    "MetricName": "", # Required
    "Timestamp": "2019-11-26T03:00:00Z", # Required
    "Dimensions" : [{"Name":"MonitoringSchedule","Value":"schedule_0"}]
    "Value": Float,
    # Either the Value or the StatisticValues field can be populated and not both.
    "StatisticValues": {
        "SampleCount": Float,
        "Sum": Float,
        "Minimum": Float,
        "Maximum": Float
    },
    "Unit": "Count", # Optional
}
```

# Create a Monitoring Schedule for a Real-time Endpoint with an CloudFormation Custom Resource
<a name="model-monitor-cloudformation-monitoring-schedules"></a>

If you are using a real-time endpoint, you can use a CloudFormation custom resource to create a monitoring schedule. The custom resource is in Python. To deploy it, see [Python Lambda deployment](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html).

## Custom Resource
<a name="model-monitor-cloudformation-custom-resource"></a>

Start by adding a custom resource to your CloudFormation template. This points to a AWS Lambda function that you create in the next step. 

This resource enables you to customize the parameters for the monitoring schedule You can add or remove more parameters by modifying the CloudFormation resource and the Lambda function in the following example resource.

```
{
    "AWSTemplateFormatVersion": "2010-09-09",
    "Resources": {
        "MonitoringSchedule": {
            "Type": "Custom::MonitoringSchedule",
            "Version": "1.0",
            "Properties": {
                "ServiceToken": "arn:aws:lambda:us-west-2:111111111111:function:lambda-name",
                "ScheduleName": "YourScheduleName",
                "EndpointName": "YourEndpointName",
                "BaselineConstraintsUri": "s3://your-baseline-constraints/constraints.json",
                "BaselineStatisticsUri": "s3://your-baseline-stats/statistics.json",
                "PostAnalyticsProcessorSourceUri": "s3://your-post-processor/postprocessor.py",
                "RecordPreprocessorSourceUri": "s3://your-preprocessor/preprocessor.py",
                "InputLocalPath": "/opt/ml/processing/endpointdata",
                "OutputLocalPath": "/opt/ml/processing/localpath",
                "OutputS3URI": "s3://your-output-uri",
                "ImageURI": "111111111111.dkr.ecr.us-west-2.amazonaws.com/your-image",
                "ScheduleExpression": "cron(0 * ? * * *)",
                "PassRoleArn": "arn:aws:iam::111111111111:role/AmazonSageMaker-ExecutionRole"
            }
        }
    }
}
```

## Lambda Custom Resource Code
<a name="model-monitor-cloudformation-lambda-custom-resource-code"></a>

This CloudFormation custom resource uses the [Custom Resource Helper](https://github.com/aws-cloudformation/custom-resource-helper) AWS library, which you can install with pip using `pip install crhelper`. 

This Lambda function is invoked by CloudFormation during the creation and deletion of the stack. This Lambda function is responsible for creating and deleting the monitoring schedule and using the parameters defined in the custom resource described in the preceding section.

```
import boto3
import botocore
import logging

from crhelper import CfnResource
from botocore.exceptions import ClientError


logger = logging.getLogger(__name__)
sm = boto3.client('sagemaker')

# cfnhelper makes it easier to implement a CloudFormation custom resource
helper = CfnResource()

# CFN Handlers

def handler(event, context):
    helper(event, context)


@helper.create
def create_handler(event, context):
    """
    Called when CloudFormation custom resource sends the create event
    """
    create_monitoring_schedule(event)


@helper.delete
def delete_handler(event, context):
    """
    Called when CloudFormation custom resource sends the delete event
    """
    schedule_name = get_schedule_name(event)
    delete_monitoring_schedule(schedule_name)


@helper.poll_create
def poll_create(event, context):
    """
    Return true if the resource has been created and false otherwise so
    CloudFormation polls again.
    """
    schedule_name = get_schedule_name(event)
    logger.info('Polling for creation of schedule: %s', schedule_name)
    return is_schedule_ready(schedule_name)

@helper.update
def noop():
    """
    Not currently implemented but crhelper will throw an error if it isn't added
    """
    pass

# Helper Functions

def get_schedule_name(event):
    return event['ResourceProperties']['ScheduleName']

def create_monitoring_schedule(event):
    schedule_name = get_schedule_name(event)
    monitoring_schedule_config = create_monitoring_schedule_config(event)

    logger.info('Creating monitoring schedule with name: %s', schedule_name)

    sm.create_monitoring_schedule(
        MonitoringScheduleName=schedule_name,
        MonitoringScheduleConfig=monitoring_schedule_config)

def is_schedule_ready(schedule_name):
    is_ready = False

    schedule = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
    status = schedule['MonitoringScheduleStatus']

    if status == 'Scheduled':
        logger.info('Monitoring schedule (%s) is ready', schedule_name)
        is_ready = True
    elif status == 'Pending':
        logger.info('Monitoring schedule (%s) still creating, waiting and polling again...', schedule_name)
    else:
        raise Exception('Monitoring schedule ({}) has unexpected status: {}'.format(schedule_name, status))

    return is_ready

def create_monitoring_schedule_config(event):
    props = event['ResourceProperties']

    return {
        "ScheduleConfig": {
            "ScheduleExpression": props["ScheduleExpression"],
        },
        "MonitoringJobDefinition": {
            "BaselineConfig": {
                "ConstraintsResource": {
                    "S3Uri": props['BaselineConstraintsUri'],
                },
                "StatisticsResource": {
                    "S3Uri": props['BaselineStatisticsUri'],
                }
            },
            "MonitoringInputs": [
                {
                    "EndpointInput": {
                        "EndpointName": props["EndpointName"],
                        "LocalPath": props["InputLocalPath"],
                    }
                }
            ],
            "MonitoringOutputConfig": {
                "MonitoringOutputs": [
                    {
                        "S3Output": {
                            "S3Uri": props["OutputS3URI"],
                            "LocalPath": props["OutputLocalPath"],
                        }
                    }
                ],
            },
            "MonitoringResources": {
                "ClusterConfig": {
                    "InstanceCount": 1,
                    "InstanceType": "ml.t3.medium",
                    "VolumeSizeInGB": 50,
                }
            },
            "MonitoringAppSpecification": {
                "ImageUri": props["ImageURI"],
                "RecordPreprocessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
                "PostAnalyticsProcessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
            },
            "StoppingCondition": {
                "MaxRuntimeInSeconds": 300
            },
            "RoleArn": props["PassRoleArn"],
        }
    }


def delete_monitoring_schedule(schedule_name):
    logger.info('Deleting schedule: %s', schedule_name)
    try:
        sm.delete_monitoring_schedule(MonitoringScheduleName=schedule_name)
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFound':
            logger.info('Resource not found, nothing to delete')
        else:
            logger.error('Unexpected error while trying to delete monitoring schedule')
            raise e
```