

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Pre-elaborazione e post-elaborazione
<a name="model-monitor-pre-and-post-processing"></a>

È possibile utilizzare script Python personalizzati di pre-elaborazione e post-elaborazione per trasformare l'input al monitoraggio del modello o estendere il codice dopo un'esecuzione di monitoraggio riuscita. Carica questi script su Amazon S3 e utilizzali come riferimento durante la creazione del monitoraggio di modello.

L'esempio seguente mostra come personalizzare le pianificazioni di monitoraggio con script di pre-elaborazione e post-elaborazione. Sostituiscilo {{user placeholder text}} con le tue informazioni.

```
import boto3, os
from sagemaker import get_execution_role, Session
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

# Upload pre and postprocessor scripts
session = Session()
bucket = boto3.Session().resource("s3").Bucket(session.default_bucket())
prefix = "{{demo-sagemaker-model-monitor}}"
pre_processor_script = bucket.Object(os.path.join(prefix, "{{preprocessor}}.py")).upload_file("{{preprocessor}}.py")
post_processor_script = bucket.Object(os.path.join(prefix, "{{postprocessor}}.py")).upload_file("{{postprocessor}}.py")

# Get execution role
role = get_execution_role() # can be an empty string

# Instance type
instance_type = "{{instance-type}}"
# instance_type = "ml.m5.xlarge" # Example

# Create a monitoring schedule with pre and postprocessing
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count={{1}},
    instance_type=instance_type,
    volume_size_in_gb={{20}},
    max_runtime_in_seconds={{3600}},
)

s3_report_path = "s3://{}/{}".format(bucket, "{{reports}}")
monitor_schedule_name = "{{monitor-schedule-name}}"
endpoint_name = "{{endpoint-name}}"
my_default_monitor.create_monitoring_schedule(
    post_analytics_processor_script=post_processor_script,
    record_preprocessor_script=pre_processor_script,
    monitor_schedule_name=monitor_schedule_name,
    # use endpoint_input for real-time endpoint
    endpoint_input=endpoint_name,
    # or use batch_transform_input for batch transform jobs
    # batch_transform_input=batch_transform_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)
```

**Topics**
+ [Script di pre-elaborazione](#model-monitor-pre-processing-script)
+ [Campionamento personalizzato](#model-monitor-pre-processing-custom-sampling)
+ [Script di post-elaborazione](#model-monitor-post-processing-script)

## Script di pre-elaborazione
<a name="model-monitor-pre-processing-script"></a>

Utilizza gli script di pre-elaborazione quando devi trasformare gli input che arrivano al monitoraggio del modello.

Per esempio, supponiamo che l'output del tuo modello sia una matrice `[1.0, 2.1]`. Il contenitore Amazon SageMaker Model Monitor funziona solo con strutture JSON tabulari o appiattite, come. `{“{{prediction0}}”: 1.0, “{{prediction1}}” : 2.1}` È possibile utilizzare uno script di pre-elaborazione come riportato di seguito per trasformare la matrice nella struttura JSON corretta.

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = inference_record.endpoint_output.data.rstrip("\n")
    data = output_data + "," + input_data
    return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
```

In un altro caso, supponiamo che il modello abbia funzionalità opzionali e che si utilizzi `-1` per indicare che la funzionalità opzionale ha un valore mancante. Se disponi di un sistema di monitoraggio per la qualità dei dati, è consigliabile rimuovere `-1` dalla matrice dei valori di input in modo che non venga incluso nei calcoli dei parametri di monitoraggio. Per rimuovere tali valori puoi utilizzare uno script come quello riportato di seguito.

```
def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

Lo script di pre-elaborazione riceve `inference_record` come unico input. Il seguente frammento di codice mostra un esempio di `inference_record`.

```
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "{{132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1}}",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "{{0.01076381653547287}}",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "{{feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9}}",
    "inferenceTime": "{{2019-11-20T23:33:12Z}}"
  },
  "eventVersion": "{{0}}"
}
```

Il seguente frammento di codice mostra l'intera struttura di classe di `inference_record`.

```
KEY_EVENT_METADATA = "eventMetadata"
KEY_EVENT_METADATA_EVENT_ID = "eventId"
KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime"
KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes"

KEY_EVENTDATA_ENCODING = "encoding"
KEY_EVENTDATA_DATA = "data"

KEY_GROUND_TRUTH_DATA = "groundTruthData"

KEY_EVENTDATA = "captureData"
KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput"
KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput"

KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput"
KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType"
KEY_EVENTDATA_MODE = "mode"

KEY_EVENT_VERSION = "eventVersion"

class EventConfig:
    def __init__(self, endpoint, variant, start_time, end_time):
        self.endpoint = endpoint
        self.variant = variant
        self.start_time = start_time
        self.end_time = end_time


class EventMetadata:
    def __init__(self, event_metadata_dict):
        self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None)
        self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None)
        self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None)


class EventData:
    def __init__(self, data_dict):
        self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None)
        self.data = data_dict.get(KEY_EVENTDATA_DATA, None)
        self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None)
        self.mode = data_dict.get(KEY_EVENTDATA_MODE, None)

    def as_dict(self):
        ret = {
            KEY_EVENTDATA_ENCODING: self.encoding,
            KEY_EVENTDATA_DATA: self.data,
            KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType,
        }
        return ret


class CapturedData:
    def __init__(self, event_dict):
        self.event_metadata = None
        self.endpoint_input = None
        self.endpoint_output = None
        self.batch_transform_output = None
        self.ground_truth = None
        self.event_version = None
        self.event_dict = event_dict
        self._event_dict_postprocessed = False
        
        if KEY_EVENT_METADATA in event_dict:
            self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA])
        if KEY_EVENTDATA in event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT])
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT])
            if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]:
                self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT])

        if KEY_GROUND_TRUTH_DATA in event_dict:
            self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA])
        if KEY_EVENT_VERSION in event_dict:
            self.event_version = event_dict[KEY_EVENT_VERSION]

    def as_dict(self):
        if self._event_dict_postprocessed is True:
            return self.event_dict
        if KEY_EVENTDATA in self.event_dict:
            if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict()
            if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][
                    KEY_EVENTDATA_ENDPOINT_OUTPUT
                ] = self.endpoint_output.as_dict()
            if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]:
                self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict()
        
        self._event_dict_postprocessed = True
        return self.event_dict

    def __str__(self):
        return str(self.as_dict())
```

## Campionamento personalizzato
<a name="model-monitor-pre-processing-custom-sampling"></a>

Puoi anche applicare una strategia di campionamento personalizzata nello script di pre-elaborazione. A tale scopo, configura il container proprietario e predefinito di Model Monitor in modo che ignori una percentuale dei record in base alla frequenza di campionamento specificata. Nell'esempio seguente, il gestore campiona il 10% dei record restituendo il record nel 10% delle chiamate al gestore e in caso contrario un elenco vuoto.

```
import random

def preprocess_handler(inference_record):
    # we set up a sampling rate of 0.1
    if random.random() > 0.1:
        # return an empty list
        return []
    input_data = inference_record.endpoint_input.data
    return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
```

### Registrazione personalizzata per lo script di pre-elaborazione
<a name="model-monitor-pre-processing-custom-logging"></a>

 Se lo script di preelaborazione restituisce un errore, controlla i messaggi di eccezione registrati in debug. CloudWatch È possibile accedere al logger tramite l'interfaccia. CloudWatch `preprocess_handler` Puoi registrare tutte le informazioni di cui hai bisogno dal tuo script a CloudWatch. Potrebbe essere utile per eseguire il debug dello script di pre-elaborazione. L'esempio seguente mostra come utilizzare l'`preprocess_handler`interfaccia per accedere a CloudWatch 

```
def preprocess_handler(inference_record, logger):
    logger.info(f"I'm a processing record: {inference_record}")
    logger.debug(f"I'm debugging a processing record: {inference_record}")
    logger.warning(f"I'm processing record with missing value: {inference_record}")
    logger.error(f"I'm a processing record with bad value: {inference_record}")
    return inference_record
```

## Script di post-elaborazione
<a name="model-monitor-post-processing-script"></a>

Utilizza uno script di post-elaborazione quando desideri estendere il codice dopo un'esecuzione di monitoraggio riuscita.

```
def postprocess_handler():
    print("Hello from post-proc script!")
```