

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Lift-and-shift 使用 @step 装饰器的 Python 代码
<a name="pipelines-step-decorator"></a>

`@step` 装饰器是将本地机器学习 (ML) 代码转换为一个或多个管道步骤的功能。您可以像编写任何 ML 项目一样编写 ML 函数。在本地测试或使用装饰器作为训练作业进行测试后，您可以通过添加`@remote`装饰器将该函数转换为 A SageMaker I 管道步骤。`@step`然后，您可以将 `@step` 装饰函数调用的输出作为一个步骤传递给 Pipelines，以创建并运行管道。您还可以使用 `@step` 装饰器串联一系列函数，创建多步骤有向无环图 (DAG) 管道。

使用 `@step` 装饰器的设置与使用 `@remote` 装饰器的设置相同。关于如何[设置环境](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator.html#train-remote-decorator-env)和[使用配置文件](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)设置默认值，请参阅远程功能文档。有关 `@step` 装饰器的更多信息，请参阅 [sagemaker.workflow.function\$1step.step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.step)。

要查看演示如何使用 `@step` 装饰器的示例笔记本，请参阅 [@step 装饰器示例笔记本](https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker-pipelines/step-decorator)。

下文将介绍如何使用 `@step` 装饰器注释本地 ML 代码以创建步骤、使用步骤创建和运行管道，以及如何根据使用场景定制体验。

**Topics**
+ [使用 `@step` 装饰函数创建管道](pipelines-step-decorator-create-pipeline.md)
+ [运行管道](pipelines-step-decorator-run-pipeline.md)
+ [配置您的管道](pipelines-step-decorator-cfg-pipeline.md)
+ [最佳实践](pipelines-step-decorator-best.md)
+ [限制](pipelines-step-decorator-limit.md)

# 使用 `@step` 装饰函数创建管道
<a name="pipelines-step-decorator-create-pipeline"></a>

您可以使用 `@step` 装饰器将 Python 函数转换为管道步骤，在这些函数之间创建依赖关系以创建管道图（或有向无环图 (DAG)），并将该图的叶节点作为步骤列表传递给管道，从而创建管道。下文将结合示例详细解释这一程序。

**Topics**
+ [将函数转换为步骤](#pipelines-step-decorator-run-pipeline-convert)
+ [在各步骤之间建立依赖关系](#pipelines-step-decorator-run-pipeline-link)
+ [使用 `ConditionStep` 和 `@step` 装饰步骤](#pipelines-step-decorator-condition)
+ [使用步骤的 `DelayedReturn` 输出定义管道](#pipelines-step-define-delayed)
+ [创建管道](#pipelines-step-decorator-pipeline-create)

## 将函数转换为步骤
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

要使用 `@step` 装饰器创建一个步骤，请使用 `@step` 对功能进行注释。下面的示例展示了一个预处理数据的 `@step` 装饰功能。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

当你调用`@step`装饰过的函数时， SageMaker AI 会返回一个`DelayedReturn`实例，而不是运行该函数。`DelayedReturn` 实例是该功能实际返回值的代理。`DelayedReturn` 实例可以作为参数传递给其他函数，也可以作为步骤直接传递给管道实例。有关该`DelayedReturn`课程的信息，请参阅 [sagemaker.workflow.function\$1step。 DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn)。

## 在各步骤之间建立依赖关系
<a name="pipelines-step-decorator-run-pipeline-link"></a>

在两个步骤之间创建依赖关系时，就在管道图中的步骤之间创建了连接。下文将介绍在管道步骤之间创建依赖关系的多种方法。

### 通过输入参数实现数据依赖
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

将一个函数的 `DelayedReturn` 输出作为另一个函数的输入，会自动在管道 DAG 中创建数据依赖关系。在下面的示例中，将 `preprocess` 函数的 `DelayedReturn` 输出传递给 `train` 函数会在 `preprocess` 和 `train` 之间产生依赖关系。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

上一个示例定义了一个训练函数，该函数用 `@step` 修饰。调用该函数时，它会接收预处理管道步骤的 `DelayedReturn` 输出作为输入。调用训练函数会返回另一个 `DelayedReturn` 实例。该实例保存了该函数中定义的所有先前步骤的信息（即本例中的 `preprocess` 步骤），这些步骤构成了管道 DAG。

在上一个示例中，`preprocess` 函数返回一个值。有关列表或元组等更复杂的返回类型，请参阅 [限制](pipelines-step-decorator-limit.md)。

### 定义自定义依赖关系
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

在上一个示例中，`train` 函数接收了 `preprocess` 的 `DelayedReturn` 输出，并创建了一个依赖关系。如果想明确定义依赖关系，而不传递前一步的输出结果，请在步骤中使用 `add_depends_on` 函数。您可以使用 `get_step()` 函数从其 `DelayedReturn` 实例中获取基础步骤，然后将依赖关系作为输入调用 `add_depends_on`\$1on。要查看 `get_step()` 函数定义，请参阅 [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step)。下面的示例演示了如何使用 `get_step()` 和 `add_depends_on()` 在 `preprocess` 和 `train` 之间创建依赖关系。

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### 将数据从 `@step` 装饰函数传递到传统管道步骤
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

您可以创建一个包含 `@step` 装饰步骤和传统管道步骤的管道，并在两者之间传递数据。例如，您可以使用 `ProcessingStep` 处理数据，并将处理结果传递给 `@step` 装饰的训练函数。在下面的示例中，`@step` 装饰的训练步骤引用了处理步骤的输出。

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## 使用 `ConditionStep` 和 `@step` 装饰步骤
<a name="pipelines-step-decorator-condition"></a>

管道支持一个 `ConditionStep` 类，该类会评估前几个步骤的结果，以决定在管道中采取什么行动。您也可以在 `@step` 装饰的步骤中使用 `ConditionStep`。要使用 `ConditionStep` 中任何 `@step` 装饰步骤的输出，请将该步骤的输出作为 `ConditionStep` 的参数输入。在下面的示例中，条件步骤接收 `@step` 装饰模型评测步骤的输出。

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## 使用步骤的 `DelayedReturn` 输出定义管道
<a name="pipelines-step-define-delayed"></a>

无论是否使用 `@step` 装饰器，定义管道的方式都是一样的。向管道传递 `DelayedReturn` 实例时，无需传递完整的步骤列表来构建管道。SDK 会根据您定义的依赖关系自动推导出前面的步骤。您传递给管道的所有上一步骤 `Step` 对象或相关 `DelayedReturn` 对象都包含在管道图中。在下面的示例中，管道为 `train` 函数接收 `DelayedReturn` 对象。 SageMaker AI 将该`preprocess`步骤作为上一步添加到管道图中。`train`

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

如果步骤之间没有数据或自定义依赖关系，并且并行运行多个步骤，管道图就会有多个叶节点。如下例所示，将所有这些叶节点以列表形式传递给管道定义中的 `steps` 参数：

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

管道运行时，两个步骤并行。

您只需将图的叶节点传递给管道，因为叶节点包含通过数据或自定义依赖关系定义的所有先前步骤的信息。在编译管道时， SageMaker AI 还会推断出构成管道图的所有后续步骤，并将每个步骤作为单独的步骤添加到管道中。

## 创建管道
<a name="pipelines-step-decorator-pipeline-create"></a>

通过调用 `pipeline.create()` 创建管道，如以下代码所示。有关 `create()` 的详细信息，请参阅 [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create)。

```
role = "pipeline-role"
pipeline.create(role)
```

当你调用时`pipeline.create()`， SageMaker AI 会编译所有定义为管道实例一部分的步骤。 SageMaker AI 将序列化函数、参数和所有其他与步骤相关的项目上传到 Amazon S3。

数据按照以下结构存放在 S3 存储桶中：

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri`在 SageMaker AI 配置文件中定义，适用于整个管道。如果未定义，则使用默认 SageMaker AI 存储桶。

**注意**  
每次 SageMaker AI 编译管道时， SageMaker AI 都会将步骤的序列化函数、参数和依赖项保存在带有当前时间戳的文件夹中。每次运行 `pipeline.create()`、`pipeline.update()`、`pipeline.upsert()` 或 `pipeline.definition()` 时都会出现这种情况。

# 运行管道
<a name="pipelines-step-decorator-run-pipeline"></a>

以下页面介绍如何使用 Amazon Pipelines 运行 SageMaker 管道，无论是使用 SageMaker AI 资源还是本地运行。

使用该`pipeline.start()`函数开始新的管道运行，就像传统的 A SageMaker I 管道运行一样。有关 `start()` 函数的信息，请参阅 [sagemaker.workflow.pipeline.Pipeline.start](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.start)。

**注意**  
使用 `@step` 装饰器定义的步骤将作为训练作业运行。因此，请注意以下限制：  
账户中的实例限制和训练作业限制。相应更新您的限制，以避免任何节流或资源限制问题。
管道中每运行一个训练步骤的相关货币成本。有关更多详情，请参阅 [Amazon SageMaker 定价](https://aws.amazon.com/sagemaker/pricing/)。

## 从本地运行的管道中读取结果
<a name="pipelines-step-decorator-run-pipeline-retrieve"></a>

要查看管道运行的任何步骤的结果，请使用 [execution.result()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline._PipelineExecution.result           )，如以下代码段所示：

```
execution = pipeline.start()
execution.result(step_name="train")
```

**注意**  
Pipelines 在本地模式下不支持 `execution.result()`。

每次只能检索一个步骤的结果。如果步骤名称由 SageMaker AI 生成，则可以通过以下方式调用`list_steps`来检索步骤名称：

```
execution.list_step()
```

## 在本地运行管道
<a name="pipelines-step-decorator-run-pipeline-local"></a>

您可以像运行传统管道步骤一样，在本地运行带有 `@step` 装饰步骤的管道。有关本地模式管道运行的详细信息，请参阅 [使用本地模式运行管道](pipelines-local-mode.md)。要使用本地模式，请在管道定义中使用 `LocalPipelineSession` 代替 `SageMakerSession`，如下例所示：

```
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model
    
step_train_result = train()

local_pipeline_session = LocalPipelineSession()

local_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=local_pipeline_session # needed for local mode
)

local_pipeline.create(role_arn="role_arn")

# pipeline runs locally
execution = local_pipeline.start()
```

# 配置您的管道
<a name="pipelines-step-decorator-cfg-pipeline"></a>

建议您使用 SageMaker AI 配置文件来设置管道的默认值。有关 SageMaker AI 配置文件的信息，请参阅[在 SageMaker Python SDK 中配置和使用默认值](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk)。添加到配置文件中的任何配置都适用于管道中的所有步骤。如果您要覆盖任何步骤的选项，请在 `@step` 装饰器参数中提供新值。下面的主题介绍了如何设置配置文件。

配置文件中 `@remote` 装饰器的配置与 `@step` 装饰器的配置完全相同。要在配置文件中设置管道角色 ARN 和管道标签，请使用以下代码段中的 `Pipeline` 部分：

```
SchemaVersion: '1.0'
SageMaker:
  Pipeline:
    RoleArn: 'arn:aws:iam::555555555555:role/IMRole'
    Tags:
    - Key: 'tag_key'
      Value: 'tag_value'
```

在配置文件中设置的大部分默认值，都可以通过向 `@step` 装饰器传递新值来覆盖。例如，您可以覆盖配置文件中为预处理步骤设置的实例类型，如下例所示：

```
@step(instance_type="ml.m5.large")
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
```

一些参数不是`@step`装饰器参数列表的一部分，只能通过 SageMaker AI 配置文件为整个管道配置这些参数。它们列举如下：
+ `sagemaker_session`(`sagemaker.session.Session`): SageMaker AI 委托服务调用的底层 SageMaker AI 会话。如果未指定，则使用默认配置创建会话，如下所示：

  ```
  SageMaker:
    PythonSDK:
      Modules:
        Session:
          DefaultS3Bucket: 'default_s3_bucket'
          DefaultS3ObjectKeyPrefix: 'key_prefix'
  ```
+ `custom_file_filter`（`CustomFileFilter)`：`CustomFileFilter` 对象，用于指定要包含在管道步骤中的本地目录和文件。如果未指定，默认为 `None`。要使 `custom_file_filter` 生效，您必须将 `IncludeLocalWorkdir` 设置为 `True`。下面的示例显示了忽略所有笔记本文件以及名为 `data` 的文件和目录的配置。

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          IncludeLocalWorkDir: true
          CustomFileFilter: 
            IgnoreNamePatterns: # files or directories to ignore
            - "*.ipynb" # all notebook files
            - "data" # folder or file named "data"
  ```

  有关如何使用 `IncludeLocalWorkdir` 和 `CustomFileFilter` 的更多详情，请参阅 [将模块化代码用于 @remote 装饰器](train-remote-decorator-modular.md)。
+ `s3_root_uri (str)`：Amazon S3 根文件夹， SageMaker AI 将代码档案和数据上传到该文件夹。如果未指定，则使用默认 SageMaker AI 存储桶。
+ `s3_kms_key (str)`：用于加密输入和输出数据的键。您只能在 SageMaker AI 配置文件中配置此参数，并且该参数适用于管道中定义的所有步骤。如果未指定，默认为 `None`。请参阅下面的 S3 KMS 密钥配置示例片段：

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          S3KmsKeyId: 's3kmskeyid'
          S3RootUri: 's3://amzn-s3-demo-bucket/my-project
  ```

# 最佳实践
<a name="pipelines-step-decorator-best"></a>

以下各节建议了在管道步骤中使用 `@step` 装饰器时应遵循的最佳实践。

## 使用暖池
<a name="pipelines-step-decorator-best-warmpool"></a>

要加快管道步骤运行速度，可使用为训练作业提供的热池功能。您可以通过为 `@step` 装饰器提供 `keep_alive_period_in_seconds` 参数来开启暖池功能，如以下代码段所示：

```
@step(
   keep_alive_period_in_seconds=900
)
```

有关暖池的更多信息，请参阅 [SageMaker AI 管理的暖池](train-warm-pools.md)。

## 目录结构
<a name="pipelines-step-decorator-best-dir"></a>

建议您在使用 `@step` 装饰器时使用代码模块。将调用步骤函数和定义管道的 `pipeline.py` 模块放在工作区的根部。建议的结构如下：

```
.
├── config.yaml # the configuration file that define the infra settings
├── requirements.txt # dependencies
├── pipeline.py  # invoke @step-decorated functions and define the pipeline here
├── steps/
| ├── processing.py
| ├── train.py
├── data/
├── test/
```

# 限制
<a name="pipelines-step-decorator-limit"></a>

以下各节概述了在管道步骤中使用 `@step` 装饰器时应注意的限制。

## 函数参数限制
<a name="pipelines-step-decorator-arg"></a>

向 `@step` 装饰函数传递输入参数时，会受到以下限制：
+ 您可以将 `DelayedReturn`、`Properties`（其他类型的步骤）、`Parameter` 和 `ExecutionVariable` 对象作为参数传递给 `@step` 装饰函数。但 `@step` 装饰的函数不支持将 `JsonGet` 和 `Join` 对象作为参数。
+ 您不能通过 `@step` 函数直接访问管道变量。下面的示例会产生一个错误：

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func():
      print(param)
  
  func() # this raises a SerializationError
  ```
+ 您不能将管道变量嵌套到另一个对象中，并将其传递给 `@step` 函数。下面的示例会产生一个错误：

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func(arg):
      print(arg)
  
  func(arg=(param,)) # this raises a SerializationError because param is nested in a tuple
  ```
+ 由于函数的输入和输出是序列化的，因此可以作为函数的输入或输出传递的数据类型受到限制。更多详情，请参阅 [调用远程函数](train-remote-decorator-invocation.md) 中的*数据序列化和反序列化*部分。同样的限制也适用于 `@step` 装饰函数。
+ 任何具有 boto 客户端的对象都不能被序列化，因此不能将此类对象作为 `@step` 装饰函数的输入或输出。例如， SageMaker Python SDK 客户端类（例如`Estimator``Predictor`、和）`Processor`无法序列化。

## 功能导入
<a name="pipelines-step-decorator-best-import"></a>

应在函数内部而不是外部导入步骤所需的库。如果在全局范围内导入，就有可能在序列化函数时发生导入碰撞。例如，`sklearn.pipeline.Pipeline` 可以被 `sagemaker.workflow.pipeline.Pipeline` 覆盖。

## 引用函数返回值的子成员
<a name="pipelines-step-decorator-best-child"></a>

如果引用 `@step` 装饰函数返回值的子成员，则会受到以下限制：
+ 如果 `DelayedReturn` 对象表示元组、列表或 dict，则可以用 `[]` 引用子成员，如下例所示：

  ```
  delayed_return[0]
  delayed_return["a_key"]
  delayed_return[1]["a_key"]
  ```
+ 由于调用函数时无法知道基础元组或列表的确切长度，因此无法解包元组或列表输出。下面的示例会产生一个错误：

  ```
  a, b, c = func() # this raises ValueError
  ```
+ 您不能遍历 `DelayedReturn` 对象。以下示例会引发错误：

  ```
  for item in func(): # this raises a NotImplementedError
  ```
+ 您不能使用“`.`”引用任意子成员。下面的示例会产生一个错误：

  ```
  delayed_return.a_child # raises AttributeError
  ```

## 不支持的现有管道功能
<a name="pipelines-step-decorator-best-unsupported"></a>

您不能使用具有以下管道功能的 `@step` 装饰器：
+ [Pipeline 步骤缓存](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html)
+ [Property 文件](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html#build-and-manage-propertyfile-property)