

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 고급 주제
<a name="model-monitor-advanced-topics"></a>

다음 섹션에는 사전 처리 및 사후 처리 스크립트를 사용하여 모니터링을 사용자 지정하는 방법, 자체 컨테이너를 빌드하는 방법,를 사용하여 모니터링 일정을 생성하는 방법을 설명하는 고급 작업이 포함되어 CloudFormation 있습니다.

**Topics**
+ [사용자 지정 모니터링 일정](model-monitor-custom-monitoring-schedules.md)
+ [CloudFormation 사용자 지정 리소스를 사용하여 실시간 엔드포인트에 대한 모니터링 일정 생성](model-monitor-cloudformation-monitoring-schedules.md)

# 사용자 지정 모니터링 일정
<a name="model-monitor-custom-monitoring-schedules"></a>

기본 제공 모니터링 메커니즘을 사용하는 것 외에도 사전 처리 및 사후 처리 스크립트를 사용하거나 자체 컨테이너를 사용하거나 빌드하여 자체 모니터링 일정과 절차를 생성할 수 있습니다.

**Topics**
+ [사전 처리 및 사후 처리](model-monitor-pre-and-post-processing.md)
+ [Amazon SageMaker Model Monitor에서 자체 컨테이너 지원](model-monitor-byoc-containers.md)

# 사전 처리 및 사후 처리
<a name="model-monitor-pre-and-post-processing"></a>

사용자 지정 사전 처리 및 사후 처리 Python 스크립트를 사용하여 모델 모니터에 대한 입력을 변환하거나 모니터링이 성공적으로 실행된 후 코드를 확장할 수 있습니다. 이 스크립트를 Amazon S3에 업로드한 다음 작업할 모델 모니터를 생성할 때 이를 참조하세요.

다음 예제는 사전 처리 스크립트와 사후 처리 스크립트를 사용하여 모니터링 일정을 사용자 지정하는 방법을 보여줍니다. *user placeholder text*를 사용자의 정보로 바꿉니다.

```
import boto3, os
from sagemaker import get_execution_role, Session
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

# Upload pre and postprocessor scripts
session = Session()
bucket = boto3.Session().resource("s3").Bucket(session.default_bucket())
prefix = "demo-sagemaker-model-monitor"
pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py")
post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py")

# Get execution role
role = get_execution_role() # can be an empty string

# Instance type
instance_type = "instance-type"
# instance_type = "ml.m5.xlarge" # Example

# Create a monitoring schedule with pre and postprocessing
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type=instance_type,
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

s3_report_path = "s3://{}/{}".format(bucket, "reports")
monitor_schedule_name = "monitor-schedule-name"
endpoint_name = "endpoint-name"
my_default_monitor.create_monitoring_schedule(
    post_analytics_processor_script=post_processor_script,
    record_preprocessor_script=pre_processor_script,
    monitor_schedule_name=monitor_schedule_name,
    # use endpoint_input for real-time endpoint
    endpoint_input=endpoint_name,
    # or use batch_transform_input for batch transform jobs
    # batch_transform_input=batch_transform_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)
```

**Topics**
+ [사전 처리 스크립트](#model-monitor-pre-processing-script)
+ [사용자 지정 샘플링](#model-monitor-pre-processing-custom-sampling)
+ [사후 처리 스크립트](#model-monitor-post-processing-script)

## 사전 처리 스크립트
<a name="model-monitor-pre-processing-script"></a>

모델 모니터에 대한 입력을 변환해야 하는 경우 사전 처리 스크립트를 사용하세요.

예를 들어, 모델의 출력이 `[1.0, 2.1]`배열이라고 가정해 보겠습니다. Amazon SageMaker Model Monitor 컨테이너는 `{“prediction0”: 1.0, “prediction1” : 2.1}`와 같은 형태로 테이블 형식 또는 평면화된 JSON 구조에서만 작동합니다. 해당 배열은 다음과 같은 사전 처리 스크립트를 사용하여 올바른 JSON 구조로 변환이 가능합니다.

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = inference_record.endpoint_output.data.rstrip("\n")
    data = output_data + "," + input_data
    return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
```

또 다른 예로, 작업 중인 모델에 선택사항 특징이 있는데, 해당 선택사항 특징에 누락된 값이 있음을 나타내기 위해 `-1`을 사용했다고 가정해 보겠습니다. 데이터 품질 모니터가 있는 경우, 입력 값 배열에서 `-1`을 제거하여 모니터의 지표 계산에 포함되지 않도록 하는 것이 좋습니다. 다음과 같은 스크립트를 사용하여 이러한 값을 제거할 수 있습니다.

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

사전 처리 스크립트는 `inference_record`를 유일한 입력으로 받아들입니다. 다음 코드 조각은 `inference_record`의 예제를 보여줍니다.

```
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.01076381653547287",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9",
    "inferenceTime": "2019-11-20T23:33:12Z"
  },
  "eventVersion": "0"
}
```

다음 코드 조각은 `inference_record`의 전체 클래스 구조를 보여줍니다.

```
KEY_EVENT_METADATA = "eventMetadata"
KEY_EVENT_METADATA_EVENT_ID = "eventId"
KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime"
KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes"

KEY_EVENTDATA_ENCODING = "encoding"
KEY_EVENTDATA_DATA = "data"

KEY_GROUND_TRUTH_DATA = "groundTruthData"

KEY_EVENTDATA = "captureData"
KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput"
KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput"

KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput"
KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType"
KEY_EVENTDATA_MODE = "mode"

KEY_EVENT_VERSION = "eventVersion"

class EventConfig:
    def __init__(self, endpoint, variant, start_time, end_time):
        self.endpoint = endpoint
        self.variant = variant
        self.start_time = start_time
        self.end_time = end_time


class EventMetadata:
    def __init__(self, event_metadata_dict):
        self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None)
        self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None)
        self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None)


class EventData:
    def __init__(self, data_dict):
        self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None)
        self.data = data_dict.get(KEY_EVENTDATA_DATA, None)
        self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None)
        self.mode = data_dict.get(KEY_EVENTDATA_MODE, None)

    def as_dict(self):
        ret = {
            KEY_EVENTDATA_ENCODING: self.encoding,
            KEY_EVENTDATA_DATA: self.data,
            KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType,
        }
        return ret


class CapturedData:
    def __init__(self, event_dict):
        self.event_metadata = None
        self.endpoint_input = None
        self.endpoint_output = None
        self.batch_transform_output = None
        self.ground_truth = None
        self.event_version = None
        self.event_dict = event_dict
        self._event_dict_postprocessed = False
        
        if KEY_EVENT_METADATA in event_dict:
            self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA])
        if KEY_EVENTDATA in event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT])
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT])
            if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT])

        if KEY_GROUND_TRUTH_DATA in event_dict:
            self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA])
        if KEY_EVENT_VERSION in event_dict:
            self.event_version = event_dict[KEY_EVENT_VERSION]

    def as_dict(self):
        if self._event_dict_postprocessed is True:
            return self.event_dict
        if KEY_EVENTDATA in self.event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict()
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][
                    KEY_EVENTDATA_ENDPOINT_OUTPUT
                ] = self.endpoint_output.as_dict()
            if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict()
        
        self._event_dict_postprocessed = True
        return self.event_dict

    def __str__(self):
        return str(self.as_dict())
```

## 사용자 지정 샘플링
<a name="model-monitor-pre-processing-custom-sampling"></a>

사전 처리 스크립트에 사용자 지정 샘플링 전략을 적용할 수도 있습니다. 이렇게 하려면, Model Monitor의 사전 구축된 자사 컨테이너를 구성하여 사용자가 지정한 샘플링 속도에 따라 해당 레코드의 백분율을 무시하도록 하세요. 다음 예제에서 핸들러는 핸들러 호출의 10%에서 레코드를 반환하고 나머지에 대해서는 빈 목록을 반환함으로써 레코드의 10%를 샘플링합니다.

```
import random

def preprocess_handler(inference_record):
    # we set up a sampling rate of 0.1
    if random.random() > 0.1:
        # return an empty list
        return []
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

### 사전 처리 스크립트에 대한 사용자 지정 로깅
<a name="model-monitor-pre-processing-custom-logging"></a>

 사전 처리 스크립트에서 오류가 반환되는 경우 CloudWatch에 기록된 예외 메시지를 확인하여 디버깅하세요. 사용자는 `preprocess_handler`인터페이스를 통해 CloudWatch의 로거에 액세스할 수 있습니다. 해당 스크립트에서 필요한 모든 정보를 CloudWatch에 기록할 수 있습니다. 이는 사전 처리 스크립트를 디버깅할 때 유용할 수 있습니다. 다음 예제는 CloudWatch로 기록하기 위해 `preprocess_handler`인터페이스를 사용하는 방법을 보여줍니다.

```
def preprocess_handler(inference_record, logger):
    logger.info(f"I'm a processing record: {inference_record}")
    logger.debug(f"I'm debugging a processing record: {inference_record}")
    logger.warning(f"I'm processing record with missing value: {inference_record}")
    logger.error(f"I'm a processing record with bad value: {inference_record}")
    return inference_record
```

## 사후 처리 스크립트
<a name="model-monitor-post-processing-script"></a>

모니터링을 성공적으로 실행한 후 코드를 확장하려면 사후 처리 스크립트를 사용하세요.

```
def postprocess_handler():
    print("Hello from post-proc script!")
```

# Amazon SageMaker Model Monitor에서 자체 컨테이너 지원
<a name="model-monitor-byoc-containers"></a>

Amazon SageMaker Model Monitor는 테이블 형식 데이터세트에 대해 엔드포인트 또는 배치 변환 작업에서 캡처한 데이터를 분석할 수 있는 사전 구축된 컨테이너를 제공합니다. 사용자의 자체 컨테이너를 사용하고자 하는 경우, Model Monitor는 사용자가 활용할 수 있는 확장 지점을 제공합니다.

사용자가 `MonitoringSchedule`을 생성하면, Model Monitor가 내부에서 작동하면서 어느 시점에 처리 작업을 개시하게 됩니다. 따라서 컨테이너는 [자체 처리 컨테이너 빌드(고급 시나리오)](build-your-own-processing-container.md)항목에 문서화된 처리 작업 계약을 숙지해야 합니다. Model Monitor는 해당 일정에 따라 사용자를 대신하여 처리 작업을 개시합니다. 호출하는 동안, Model Monitor는 사용자의 컨테이너가 예약된 모니터링의 특정 실행을 위한 데이터를 처리하기에 충분한 컨텍스트를 갖추도록 추가적인 환경 변수를 설정합니다. 컨테이너 입력에 대한 자세한 내용은 [컨테이너 계약 입력](model-monitor-byoc-contract-inputs.md)섹션을 참조하세요.

컨테이너에서 위의 환경 변수/컨텍스트를 사용하여 사용자 지정 코드에서 현재 기간의 데이터세트를 분석할 수 있습니다. 이 분석이 완료되면, 보고서를 내보낸 다음 S3 버킷에 업로드되도록 선택할 수 있습니다. 사전 구축 컨테이너에서 생성되는 보고서는 [컨테이너 계약 출력](model-monitor-byoc-contract-outputs.md)에 설명되어 있습니다. 보고서의 시각화가 SageMaker Studio에서도 작동할 수 있게 하려면, 동일한 형식을 따라야 합니다. 완전히 사용자 지정된 보고서를 내보내는 방법을 선택할 수도 있습니다.

또한 [BYOC(Bring Your Own Container)에 대한 CloudWatch 지표](model-monitor-byoc-cloudwatch.md)의 지침에 따라 컨테이너에서 CloudWatch 지표를 내보낼 수도 있습니다.

**Topics**
+ [컨테이너 계약 입력](model-monitor-byoc-contract-inputs.md)
+ [컨테이너 계약 출력](model-monitor-byoc-contract-outputs.md)
+ [BYOC(Bring Your Own Container)에 대한 CloudWatch 지표](model-monitor-byoc-cloudwatch.md)

# 컨테이너 계약 입력
<a name="model-monitor-byoc-contract-inputs"></a>

Amazon SageMaker Model Monitor 플랫폼은 지정된 일정에 따라 사용자의 컨테이너 코드를 호출합니다. 사용자가 자체 컨테이너 코드를 작성하기로 선택한 경우, 다음 환경 변수를 사용할 수 있습니다. 이러한 컨텍스트에서, 사용자는 현재 데이터세트를 분석하거나, 또는 (해당 시) 지표를 내보내기로 선택한 경우 제약 조건을 평가할 수 있습니다.

`dataset_format` 변수를 제외하고, 실시간 엔드포인트 및 배치 변환 작업에 사용할 수 있는 환경 변수는 동일하게 제공됩니다. 사용자가 실시간 엔드포인트를 사용하는 경우, `dataset_format`변수는 다음 옵션을 지원합니다.

```
{\"sagemakerCaptureJson\": {\"captureIndexNames\": [\"endpointInput\",\"endpointOutput\"]}}
```

사용자가 배치 변환 작업을 사용하는 경우, `dataset_format`은 다음 옵션을 지원합니다.

```
{\"csv\": {\"header\": [\"true\",\"false\"]}}
```

```
{\"json\": {\"line\": [\"true\",\"false\"]}}
```

```
{\"parquet\": {}}
```

다음 코드 샘플은 사용자가 컨테이너 코드에 사용할 수 있고 실시간 엔드포인트에 대해 `dataset_format`형식을 사용하는 모든 환경 변수의 세트를 보여줍니다.

```
"Environment": {
 "dataset_format": "{\"sagemakerCaptureJson\": {\"captureIndexNames\": [\"endpointInput\",\"endpointOutput\"]}}",
 "dataset_source": "/opt/ml/processing/endpointdata",
 "end_time": "2019-12-01T16: 20: 00Z",
 "output_path": "/opt/ml/processing/resultdata",
 "publish_cloudwatch_metrics": "Disabled",
 "sagemaker_endpoint_name": "endpoint-name",
 "sagemaker_monitoring_schedule_name": "schedule-name",
 "start_time": "2019-12-01T15: 20: 00Z"
}
```

파라미터 


| 파라미터 이름 | 설명 | 
| --- | --- | 
| dataset\$1format |  `Endpoint`에서 지원되는 `MonitoringSchedule`에서 시작된 작업의 경우, 이는 캡처 인덱스 `endpointInput`또는 `endpointOutput`나 둘 모두를 가진 `sageMakerCaptureJson`입니다. 배치 변환 작업인 경우, 이는 CSV, JSON 또는 Parquet 등의 데이터 형식을 지정합니다.  | 
| dataset\$1source |  사용자가 실시간 엔드포인트를 사용하는 경우, `start_time`및 `end_time`으로 지정된 모니터링 기간에 해당하는 데이터가 포함되어 있는 로컬 경로를 사용할 수 있습니다. 이 경로에서는 ` /{endpoint-name}/{variant-name}/yyyy/mm/dd/hh`에서 데이터를 사용할 수 있습니다. 때때로 시작 및 종료 시간에서 지정된 것보다 더 많은 것을 다운로드합니다. 필요에 따라 데이터를 구문 분석하는 것은 컨테이너 코드에 달려 있습니다.  | 
| output\$1path |  출력 보고서 및 기타 파일을 작성할 로컬 경로입니다. `CreateMonitoringSchedule` 요청에서 `MonitoringOutputConfig.MonitoringOutput[0].LocalPath`로 이 파라미터를 지정합니다. `S3Uri`에 지정된 `MonitoringOutputConfig.MonitoringOutput[0].S3Uri`경로에 업로드됩니다.  | 
| publish\$1cloudwatch\$1metrics |  `CreateMonitoringSchedule`에서 시작한 작업의 경우 이 파라미터가 `Enabled`로 설정됩니다. 컨테이너는 Amazon CloudWatch 출력 파일을 `[filepath]`에서 작성하도록 선택할 수 있습니다.  | 
| sagemaker\$1endpoint\$1name |  사용자가 실시간 엔드포인트를 사용하는 경우, 이 예약된 작업이 시작된 대상에 해당하는 `Endpoint`의 이름입니다.  | 
| sagemaker\$1monitoring\$1schedule\$1name |  이 작업을 시작한 `MonitoringSchedule`의 이름입니다.  | 
| \$1sagemaker\$1endpoint\$1datacapture\$1prefix\$1 |  사용자가 실시간 엔드포인트를 사용하는 경우, `Endpoint`의 `DataCaptureConfig`매개변수에 지정되어 있는 접두사입니다. 컨테이너는 `dataset_source` 경로에서 SageMaker AI가 이미 다운로드 한 것보다 더 많은 양의 데이터에 직접 액세스해야 하는 경우에 이를 사용할 수 있습니다.  | 
| start\$1time, end\$1time |  이 분석이 실행되는 기간입니다. 예를 들어 05:00 UTC에 실행되도록 예약된 작업과 20/02/2020에 실행되는 작업의 경우, `start_time`은 2020-02-19T06:00:00Z이고 `end_time`는 2020-02-20T05:00:00Z가 됩니다.  | 
| baseline\$1constraints: |  ` BaselineConfig.ConstraintResource.S3Uri`에 지정된 기준 제약 조건 파일의 로컬 경로입니다. 이 파라미터가 `CreateMonitoringSchedule`요청에 지정된 경우에만 사용할 수 있습니다.  | 
| baseline\$1statistics |  `BaselineConfig.StatisticsResource.S3Uri`에 지정된 기준 통계 파일에 대한 로컬 경로입니다. 이 파라미터가 `CreateMonitoringSchedule`요청에 지정된 경우에만 사용할 수 있습니다.  | 

# 컨테이너 계약 출력
<a name="model-monitor-byoc-contract-outputs"></a>

컨테이너는 `*dataset_source*`경로에서 사용 가능한 데이터를 분석하고 `*output_path*.`의 경로에 대한 보고서를 작성할 수 있습니다.컨테이너 코드는 사용자의 필요에 맞는 보고서를 작성할 수 있습니다.

사용자가 다음 구조 및 계약을 사용하는 경우, 특정 출력 파일은 시각화와 API에서 SageMaker AI에 의해 특별히 처리됩니다. 이는 테이블 형식 데이터세트에만 적용됩니다.

테이블 형식 데이터세트의 출력 파일


| 파일 이름 | 설명 | 
| --- | --- | 
| statistics.json |  이 파일에는 분석되는 데이터세트의 각 기능에 대한 열 기반 통계가 있어야 합니다. 이 파일에 대한 스키마는 다음 섹션에서 사용할 수 있습니다.  | 
| constraints.json |  이 파일에는 관찰된 기능에 대한 제약 조건이 있어야 합니다. 이 파일에 대한 스키마는 다음 섹션에서 사용할 수 있습니다.  | 
| constraints\$1violations.json |  이 파일에는 `baseline_constaints`및 `baseline_statistics`경로에 지정된 기준 통계 및 제약 조건 파일과 비교하여 현재 데이터세트에서 발견된 위반 목록이 있어야 합니다.  | 

또한 `publish_cloudwatch_metrics`값이 `"Enabled"`인 경우, 컨테이너 코드는 이 위치(`/opt/ml/output/metrics/cloudwatch`)에서 Amazon CloudWatch 지표를 내보낼 수 있습니다. 이들 파일에 대한 스키마는 다음 섹션에 설명되어 있습니다.

**Topics**
+ [통계에 대한 스키마(statistics.json 파일)](model-monitor-byoc-statistics.md)
+ [제약 조건에 대한 스키마(constraints.json 파일)](model-monitor-byoc-constraints.md)

# 통계에 대한 스키마(statistics.json 파일)
<a name="model-monitor-byoc-statistics"></a>

`statistics.json` 파일에 정의된 스키마는 캡처된 기준 및 데이터에 대해 계산할 통계 파라미터를 지정합니다. 또한 [KLL](https://datasketches.apache.org/docs/KLL/KLLSketch.html)에서 사용할 버킷을 구성합니다.KLL은 압축 체계가 느슨한 매우 컴팩트한 분위 스케치입니다.

```
{
    "version": 0,
    # dataset level stats
    "dataset": {
        "item_count": number
    },
    # feature level stats
    "features": [
        {
            "name": "feature-name",
            "inferred_type": "Fractional" | "Integral",
            "numerical_statistics": {
                "common": {
                    "num_present": number,
                    "num_missing": number
                },
                "mean": number,
                "sum": number,
                "std_dev": number,
                "min": number,
                "max": number,
                "distribution": {
                    "kll": {
                        "buckets": [
                            {
                                "lower_bound": number,
                                "upper_bound": number,
                                "count": number
                            }
                        ],
                        "sketch": {
                            "parameters": {
                                "c": number,
                                "k": number
                            },
                            "data": [
                                [
                                    num,
                                    num,
                                    num,
                                    num
                                ],
                                [
                                    num,
                                    num
                                ][
                                    num,
                                    num
                                ]
                            ]
                        }#sketch
                    }#KLL
                }#distribution
            }#num_stats
        },
        {
            "name": "feature-name",
            "inferred_type": "String",
            "string_statistics": {
                "common": {
                    "num_present": number,
                    "num_missing": number
                },
                "distinct_count": number,
                "distribution": {
                    "categorical": {
                         "buckets": [
                                {
                                    "value": "string",
                                    "count": number
                                }
                          ]
                     }
                }
            },
            #provision for custom stats
        }
    ]
}
```

**참고**  
지정된 지표는 이후 시각화 변경 시 SageMaker AI에서 인식됩니다. 필요한 경우 컨테이너는 더 많은 지표를 내보낼 수 있습니다.
[KLL 스케치](https://datasketches.apache.org/docs/KLL/KLLSketch.html)는 인식된 스케치입니다. 사용자 지정 컨테이너는 자체 표현을 작성할 수 있지만, 시각화 시 SageMaker AI에서 인식되지는 않습니다.
기본적으로 배포는 10개의 버킷으로 분포를 구체화됩니다. 이 값은 변경할 수 없습니다.

# 제약 조건에 대한 스키마(constraints.json 파일)
<a name="model-monitor-byoc-constraints"></a>

constraints.json 파일은 데이터세트가 반드시 충족해야 하는 제약 조건을 표현하는 데 사용됩니다. Amazon SageMaker Model Monitor 컨테이너는 constraints.json 파일을 사용하여 데이터세트를 평가할 수 있습니다. 사전 구축된 컨테이너는 기준 데이터세트에 대한 constraints.json 파일을 자동으로 생성해주는 기능을 제공합니다. 자체 컨테이너를 사용하는 경우, 비슷한 기능을 제공하거나 기타 몇 가지 방법으로 containints.json 파일을 생성할 수 있습니다. 다음은 미리 빌드 컨테이너가 사용하는 제약 조건 파일의 스키마입니다. 자체 컨테이너를 사용하면 동일한 형식을 채택하거나 필요에 따라 형식을 개선할 수 있습니다.

```
{
    "version": 0,
    "features":
    [
        {
            "name": "string",
            "inferred_type": "Integral" | "Fractional" | 
                    | "String" | "Unknown",
            "completeness": number,
            "num_constraints":
            {
                "is_non_negative": boolean
            },
            "string_constraints":
            {
                "domains":
                [
                    "list of",
                    "observed values",
                    "for small cardinality"
                ]
            },
            "monitoringConfigOverrides":
            {}
        }
    ],
    "monitoring_config":
    {
        "evaluate_constraints": "Enabled",
        "emit_metrics": "Enabled",
        "datatype_check_threshold": 0.1,
        "domain_content_threshold": 0.1,
        "distribution_constraints":
        {
            "perform_comparison": "Enabled",
            "comparison_threshold": 0.1,
            "comparison_method": "Simple"||"Robust",
            "categorical_comparison_threshold": 0.1,
            "categorical_drift_method": "LInfinity"||"ChiSquared"
        }
    }
}
```

`monitoring_config` 객체에는 특징에 대한 작업을 모니터링하는 옵션이 포함되어 있습니다. 다음 표에 각 옵션이 설명되어 있습니다.

모니터링 제약 조건

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/model-monitor-byoc-constraints.html)

# BYOC(Bring Your Own Container)에 대한 CloudWatch 지표
<a name="model-monitor-byoc-cloudwatch"></a>

`publish_cloudwatch_metrics` 값이 `/opt/ml/processing/processingjobconfig.json`파일의 `Environment`맵에 있는 `Enabled`인 경우, 컨테이너 코드는 `/opt/ml/output/metrics/cloudwatch`위치에 Amazon CloudWatch 지표를 내보냅니다.

이 파일의 스키마는 `PutMetrics`API에 밀접한 기반을 두고 있습니다. 네임스페이스는 여기에서 지정되지 않았습니다. 기본값은 다음과 같습니다.
+ `For real-time endpoints: /aws/sagemaker/Endpoint/data-metrics`
+ `For batch transform jobs: /aws/sagemaker/ModelMonitoring/data-metrics`

그러나 차원을 지정할 수 있습니다. 최소한 다음 차원을 추가하는 것이 좋습니다.
+ 실시간 엔드포인트인 경우 `Endpoint`및 `MonitoringSchedule`
+ 배치 변환 작업인 경우 `MonitoringSchedule`

다음 JSON 코드 조각은 차원을 설정하는 방법을 보여줍니다.

실시간 엔드포인트인 경우, `Endpoint`및 `MonitoringSchedule`차원이 포함된 다음 JSON 코드 조각을 참조하세요.

```
{ 
    "MetricName": "", # Required
    "Timestamp": "2019-11-26T03:00:00Z", # Required
    "Dimensions" : [{"Name":"Endpoint","Value":"endpoint_0"},{"Name":"MonitoringSchedule","Value":"schedule_0"}]
    "Value": Float,
    # Either the Value or the StatisticValues field can be populated and not both.
    "StatisticValues": {
        "SampleCount": Float,
        "Sum": Float,
        "Minimum": Float,
        "Maximum": Float
    },
    "Unit": "Count", # Optional
}
```

배치 변환 작업인 경우, `MonitoringSchedule`차원이 포함된 다음 JSON 코드 조각을 참조하세요.

```
{ 
    "MetricName": "", # Required
    "Timestamp": "2019-11-26T03:00:00Z", # Required
    "Dimensions" : [{"Name":"MonitoringSchedule","Value":"schedule_0"}]
    "Value": Float,
    # Either the Value or the StatisticValues field can be populated and not both.
    "StatisticValues": {
        "SampleCount": Float,
        "Sum": Float,
        "Minimum": Float,
        "Maximum": Float
    },
    "Unit": "Count", # Optional
}
```

# CloudFormation 사용자 지정 리소스를 사용하여 실시간 엔드포인트에 대한 모니터링 일정 생성
<a name="model-monitor-cloudformation-monitoring-schedules"></a>

실시간 엔드포인트를 사용하는 경우 CloudFormation 사용자 지정 리소스를 사용하여 모니터링 일정을 생성할 수 있습니다. 사용자 지정 리소스는 Python에 있습니다. 배포 방법은 [Python Lambda 배포](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html)를 참조하세요.

## 사용자 지정 리소스
<a name="model-monitor-cloudformation-custom-resource"></a>

먼저 CloudFormation 템플릿에 사용자 지정 리소스를 추가합니다. 이는 사용자가 다음 단계에서 생성하는 AWS Lambda 함수를 가리킵니다.

이 리소스를 사용하면 모니터링 일정에 대한 파라미터를 사용자 지정할 수 있습니다. 다음 예제 리소스에서 CloudFormation 리소스와 Lambda 함수를 수정하여 더 많은 파라미터를 추가하거나 제거할 수 있습니다.

```
{
    "AWSTemplateFormatVersion": "2010-09-09",
    "Resources": {
        "MonitoringSchedule": {
            "Type": "Custom::MonitoringSchedule",
            "Version": "1.0",
            "Properties": {
                "ServiceToken": "arn:aws:lambda:us-west-2:111111111111:function:lambda-name",
                "ScheduleName": "YourScheduleName",
                "EndpointName": "YourEndpointName",
                "BaselineConstraintsUri": "s3://your-baseline-constraints/constraints.json",
                "BaselineStatisticsUri": "s3://your-baseline-stats/statistics.json",
                "PostAnalyticsProcessorSourceUri": "s3://your-post-processor/postprocessor.py",
                "RecordPreprocessorSourceUri": "s3://your-preprocessor/preprocessor.py",
                "InputLocalPath": "/opt/ml/processing/endpointdata",
                "OutputLocalPath": "/opt/ml/processing/localpath",
                "OutputS3URI": "s3://your-output-uri",
                "ImageURI": "111111111111.dkr.ecr.us-west-2.amazonaws.com/your-image",
                "ScheduleExpression": "cron(0 * ? * * *)",
                "PassRoleArn": "arn:aws:iam::111111111111:role/AmazonSageMaker-ExecutionRole"
            }
        }
    }
}
```

## Lambda 사용자 지정 리소스 코드
<a name="model-monitor-cloudformation-lambda-custom-resource-code"></a>

이 CloudFormation 사용자 지정 리소스는를 사용하여 pip로 설치할 수 있는 [사용자 지정 리소스 헬퍼](https://github.com/aws-cloudformation/custom-resource-helper) AWS 라이브러리를 사용합니다`pip install crhelper`.

이 Lambda 함수는 스택을 생성하고 삭제하는 CloudFormation 동안에서 호출됩니다. 이 Lambda 함수는 모니터링 일정을 생성 및 삭제하고 이전 섹션에서 설명한 사용자 지정 리소스에 정의된 파라미터를 사용합니다.

```
import boto3
import botocore
import logging

from crhelper import CfnResource
from botocore.exceptions import ClientError


logger = logging.getLogger(__name__)
sm = boto3.client('sagemaker')

# cfnhelper makes it easier to implement a CloudFormation custom resource
helper = CfnResource()

# CFN Handlers

def handler(event, context):
    helper(event, context)


@helper.create
def create_handler(event, context):
    """
    Called when CloudFormation custom resource sends the create event
    """
    create_monitoring_schedule(event)


@helper.delete
def delete_handler(event, context):
    """
    Called when CloudFormation custom resource sends the delete event
    """
    schedule_name = get_schedule_name(event)
    delete_monitoring_schedule(schedule_name)


@helper.poll_create
def poll_create(event, context):
    """
    Return true if the resource has been created and false otherwise so
    CloudFormation polls again.
    """
    schedule_name = get_schedule_name(event)
    logger.info('Polling for creation of schedule: %s', schedule_name)
    return is_schedule_ready(schedule_name)

@helper.update
def noop():
    """
    Not currently implemented but crhelper will throw an error if it isn't added
    """
    pass

# Helper Functions

def get_schedule_name(event):
    return event['ResourceProperties']['ScheduleName']

def create_monitoring_schedule(event):
    schedule_name = get_schedule_name(event)
    monitoring_schedule_config = create_monitoring_schedule_config(event)

    logger.info('Creating monitoring schedule with name: %s', schedule_name)

    sm.create_monitoring_schedule(
        MonitoringScheduleName=schedule_name,
        MonitoringScheduleConfig=monitoring_schedule_config)

def is_schedule_ready(schedule_name):
    is_ready = False

    schedule = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
    status = schedule['MonitoringScheduleStatus']

    if status == 'Scheduled':
        logger.info('Monitoring schedule (%s) is ready', schedule_name)
        is_ready = True
    elif status == 'Pending':
        logger.info('Monitoring schedule (%s) still creating, waiting and polling again...', schedule_name)
    else:
        raise Exception('Monitoring schedule ({}) has unexpected status: {}'.format(schedule_name, status))

    return is_ready

def create_monitoring_schedule_config(event):
    props = event['ResourceProperties']

    return {
        "ScheduleConfig": {
            "ScheduleExpression": props["ScheduleExpression"],
        },
        "MonitoringJobDefinition": {
            "BaselineConfig": {
                "ConstraintsResource": {
                    "S3Uri": props['BaselineConstraintsUri'],
                },
                "StatisticsResource": {
                    "S3Uri": props['BaselineStatisticsUri'],
                }
            },
            "MonitoringInputs": [
                {
                    "EndpointInput": {
                        "EndpointName": props["EndpointName"],
                        "LocalPath": props["InputLocalPath"],
                    }
                }
            ],
            "MonitoringOutputConfig": {
                "MonitoringOutputs": [
                    {
                        "S3Output": {
                            "S3Uri": props["OutputS3URI"],
                            "LocalPath": props["OutputLocalPath"],
                        }
                    }
                ],
            },
            "MonitoringResources": {
                "ClusterConfig": {
                    "InstanceCount": 1,
                    "InstanceType": "ml.t3.medium",
                    "VolumeSizeInGB": 50,
                }
            },
            "MonitoringAppSpecification": {
                "ImageUri": props["ImageURI"],
                "RecordPreprocessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
                "PostAnalyticsProcessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
            },
            "StoppingCondition": {
                "MaxRuntimeInSeconds": 300
            },
            "RoleArn": props["PassRoleArn"],
        }
    }


def delete_monitoring_schedule(schedule_name):
    logger.info('Deleting schedule: %s', schedule_name)
    try:
        sm.delete_monitoring_schedule(MonitoringScheduleName=schedule_name)
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFound':
            logger.info('Resource not found, nothing to delete')
        else:
            logger.error('Unexpected error while trying to delete monitoring schedule')
            raise e
```