

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Pré-processamento e pós-processamento
<a name="model-monitor-pre-and-post-processing"></a>

Você pode usar scripts Python personalizados de pré-processamento e pós-processamento para transformar a entrada do seu monitor de modelo ou estender o código após uma execução bem-sucedida do monitoramento. Faça o upload desses scripts para o Amazon S3 e faça referência a eles ao criar seu monitor de modelo.

O exemplo a seguir mostra como personalizar as programações de monitoramento com scripts de pré-processamento e pós-processamento. {{user placeholder text}}Substitua por suas próprias informações.

```
import boto3, os
from sagemaker import get_execution_role, Session
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

# Upload pre and postprocessor scripts
session = Session()
bucket = boto3.Session().resource("s3").Bucket(session.default_bucket())
prefix = "{{demo-sagemaker-model-monitor}}"
pre_processor_script = bucket.Object(os.path.join(prefix, "{{preprocessor}}.py")).upload_file("{{preprocessor}}.py")
post_processor_script = bucket.Object(os.path.join(prefix, "{{postprocessor}}.py")).upload_file("{{postprocessor}}.py")

# Get execution role
role = get_execution_role() # can be an empty string

# Instance type
instance_type = "{{instance-type}}"
# instance_type = "ml.m5.xlarge" # Example

# Create a monitoring schedule with pre and postprocessing
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count={{1}},
    instance_type=instance_type,
    volume_size_in_gb={{20}},
    max_runtime_in_seconds={{3600}},
)

s3_report_path = "s3://{}/{}".format(bucket, "{{reports}}")
monitor_schedule_name = "{{monitor-schedule-name}}"
endpoint_name = "{{endpoint-name}}"
my_default_monitor.create_monitoring_schedule(
    post_analytics_processor_script=post_processor_script,
    record_preprocessor_script=pre_processor_script,
    monitor_schedule_name=monitor_schedule_name,
    # use endpoint_input for real-time endpoint
    endpoint_input=endpoint_name,
    # or use batch_transform_input for batch transform jobs
    # batch_transform_input=batch_transform_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)
```

**Topics**
+ [Script de pré-processamento](#model-monitor-pre-processing-script)
+ [Amostragem personalizada](#model-monitor-pre-processing-custom-sampling)
+ [Script de pós-processamento](#model-monitor-post-processing-script)

## Script de pré-processamento
<a name="model-monitor-pre-processing-script"></a>

Use scripts de pré-processamento quando precisar transformar as entradas do seu monitor do modelo.

Por exemplo, suponha que a saída do seu modelo seja uma matriz `[1.0, 2.1]`. O contêiner Amazon SageMaker Model Monitor só funciona com estruturas JSON tabulares ou achatadas, como. `{“{{prediction0}}”: 1.0, “{{prediction1}}” : 2.1}` Você pode usar um script de pré-processamento como o seguinte para transformar a matriz na estrutura JSON correta:

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = inference_record.endpoint_output.data.rstrip("\n")
    data = output_data + "," + input_data
    return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
```

Em outro exemplo, suponha que seu modelo tenha atributos opcionais e você use `-1` para indicar que o atributo opcional tem um valor ausente. Se você tiver um monitor de qualidade de dados, talvez queira remover o `-1` da matriz de valores de entrada para que não seja incluído nos cálculos métricos do monitor. Você pode usar um script como o seguinte para remover esses valores:

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

Seu script de pré-processamento recebe um `inference_record` como única entrada. O trecho de código a seguir mostra um exemplo de um `inference_record`.

```
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "{{132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1}}",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "{{0.01076381653547287}}",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "{{feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9}}",
    "inferenceTime": "{{2019-11-20T23:33:12Z}}"
  },
  "eventVersion": "{{0}}"
}
```

O trecho de código a seguir mostra a estrutura de classe completa de um `inference_record`.

```
KEY_EVENT_METADATA = "eventMetadata"
KEY_EVENT_METADATA_EVENT_ID = "eventId"
KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime"
KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes"

KEY_EVENTDATA_ENCODING = "encoding"
KEY_EVENTDATA_DATA = "data"

KEY_GROUND_TRUTH_DATA = "groundTruthData"

KEY_EVENTDATA = "captureData"
KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput"
KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput"

KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput"
KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType"
KEY_EVENTDATA_MODE = "mode"

KEY_EVENT_VERSION = "eventVersion"

class EventConfig:
    def __init__(self, endpoint, variant, start_time, end_time):
        self.endpoint = endpoint
        self.variant = variant
        self.start_time = start_time
        self.end_time = end_time


class EventMetadata:
    def __init__(self, event_metadata_dict):
        self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None)
        self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None)
        self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None)


class EventData:
    def __init__(self, data_dict):
        self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None)
        self.data = data_dict.get(KEY_EVENTDATA_DATA, None)
        self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None)
        self.mode = data_dict.get(KEY_EVENTDATA_MODE, None)

    def as_dict(self):
        ret = {
            KEY_EVENTDATA_ENCODING: self.encoding,
            KEY_EVENTDATA_DATA: self.data,
            KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType,
        }
        return ret


class CapturedData:
    def __init__(self, event_dict):
        self.event_metadata = None
        self.endpoint_input = None
        self.endpoint_output = None
        self.batch_transform_output = None
        self.ground_truth = None
        self.event_version = None
        self.event_dict = event_dict
        self._event_dict_postprocessed = False
        
        if KEY_EVENT_METADATA in event_dict:
            self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA])
        if KEY_EVENTDATA in event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT])
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT])
            if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT])

        if KEY_GROUND_TRUTH_DATA in event_dict:
            self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA])
        if KEY_EVENT_VERSION in event_dict:
            self.event_version = event_dict[KEY_EVENT_VERSION]

    def as_dict(self):
        if self._event_dict_postprocessed is True:
            return self.event_dict
        if KEY_EVENTDATA in self.event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict()
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][
                    KEY_EVENTDATA_ENDPOINT_OUTPUT
                ] = self.endpoint_output.as_dict()
            if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict()
        
        self._event_dict_postprocessed = True
        return self.event_dict

    def __str__(self):
        return str(self.as_dict())
```

## Amostragem personalizada
<a name="model-monitor-pre-processing-custom-sampling"></a>

Você também pode aplicar uma estratégia de amostragem personalizada em seu script de pré-processamento. Para fazer isso, configure o contêiner pré-criado original do Model Monitor para ignorar uma porcentagem dos registros de acordo com a taxa de amostragem especificada. No exemplo a seguir, o manipulador coleta amostras de 10% dos registros retornando o registro em 10% das chamadas do manipulador e, caso contrário, uma lista vazia.

```
import random

def preprocess_handler(inference_record):
    # we set up a sampling rate of 0.1
    if random.random() > 0.1:
        # return an empty list
        return []
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

### Registro personalizado para script de pré-processamento
<a name="model-monitor-pre-processing-custom-logging"></a>

 Se o script de pré-processamento retornar um erro, verifique as mensagens de exceção registradas CloudWatch para depuração. Você pode acessar o logger por CloudWatch meio da `preprocess_handler` interface. Você pode registrar todas as informações necessárias do seu script em CloudWatch. Isso pode ser útil ao depurar seu script de pré-processamento. O exemplo a seguir mostra como você pode usar a `preprocess_handler` interface para fazer login em CloudWatch 

```
def preprocess_handler(inference_record, logger):
    logger.info(f"I'm a processing record: {inference_record}")
    logger.debug(f"I'm debugging a processing record: {inference_record}")
    logger.warning(f"I'm processing record with missing value: {inference_record}")
    logger.error(f"I'm a processing record with bad value: {inference_record}")
    return inference_record
```

## Script de pós-processamento
<a name="model-monitor-post-processing-script"></a>

Use um script de pós-processamento quando quiser estender o código após uma execução de monitoramento bem-sucedida.

```
def postprocess_handler():
    print("Hello from post-proc script!")
```