

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 `@step` 裝飾的函式建立管道
<a name="pipelines-step-decorator-create-pipeline"></a>

您可以建立一個管道，方法是使用 `@step` 裝飾項目將 Python 函式轉換為管道步驟、在這些函式之間建立相依性以建立管道圖形 (或有向無環圖 (DAG))，然後將該圖形的分葉節點做為步驟清單傳遞至管道。下列各節會使用範例詳細說明此程序。

**Topics**
+ [將函式轉換為步驟](#pipelines-step-decorator-run-pipeline-convert)
+ [在步驟之間建立相依性](#pipelines-step-decorator-run-pipeline-link)
+ [使用 `ConditionStep` 搭配 `@step` 裝飾的步驟](#pipelines-step-decorator-condition)
+ [使用步驟的 `DelayedReturn` 輸出定義管道](#pipelines-step-define-delayed)
+ [建立管道](#pipelines-step-decorator-pipeline-create)

## 將函式轉換為步驟
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

若要使用 `@step` 裝飾項目建立步驟，請使用 `@step` 註釋函式。下列範例顯示預先處理資料的 `@step` 裝飾函式。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

當您調用 `@step` 裝飾函式時，SageMaker AI 會傳回 `DelayedReturn` 執行個體，而不是執行該函式。`DelayedReturn` 執行個體是該函式實際傳回的 Proxy。`DelayedReturn` 執行個體可以做為引數傳遞至另一個函式，或直接做為步驟傳遞至管道執行個體。如需 `DelayedReturn` 類別的相關資訊，請參閱 [sagemaker.workflow.function\$1step.DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn)。

## 在步驟之間建立相依性
<a name="pipelines-step-decorator-run-pipeline-link"></a>

當您在兩個步驟之間建立相依性時，您可以在管道圖中的步驟之間建立連線。下列各節介紹在管道步驟之間建立相依性的多種方式。

### 透過輸入引數的資料相依性
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

將某個函式的 `DelayedReturn` 輸出做為輸入傳遞至另一個函式，會自動在管道 DAG 中建立資料相依性。在下列範例中，將 `preprocess` 函式的 `DelayedReturn` 輸出傳遞至 `train` 函式，會在 `preprocess` 與 `train` 之間建立相依性。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

前一個範例定義了使用 `@step` 裝飾的訓練函式。調用此函式時，其會接收預先處理管道步驟的 `DelayedReturn` 輸出做為輸入。調用訓練函式會傳回另一個 `DelayedReturn`。執行個體。此執行個體會保留形成管道 DAG 之函式 (即此範例中 `preprocess` 的步驟) 中定義之所有先前步驟的相關資訊。

在前一個範例中，`preprocess` 函式會傳回單一值。如需清單或元組等更複雜的傳回類型，請參閱[限制](pipelines-step-decorator-limit.md)。

### 定義自訂相依性
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

在前一個範例中，`train` 函式收到 `preprocess` 的 `DelayedReturn` 輸出並建立了相依性。如果您想要明確定義相依性，而不傳遞前一個步驟輸出，請使用 `add_depends_on` 函式搭配步驟。您可以使用 `get_step()` 函式，從其 `DelayedReturn` 執行個體擷取基礎步驟，然後使用相依性做為輸入來呼叫 `add_depends_on`\$1on。若要檢視 `get_step()` 函式定義，請參閱 [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step)。下列範例展示如何使用 `get_step()` 和 `add_depends_on()` 建立 `preprocess` 與 `train` 之間的相依性。

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### 將資料從 `@step` 裝飾函式傳遞至傳統管道步驟，或從中將資料傳遞至該函式
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

您可以建立一個管道，其中包含 `@step` 裝飾步驟和傳統管道步驟，以及在它們之間傳遞資料。例如，您可以使用 `ProcessingStep` 來處理資料，並將其結果傳遞至 `@step` 裝飾的訓練函式。在下列範例中，`@step` 裝飾的訓練步驟會參考處理步驟的輸出。

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## 使用 `ConditionStep` 搭配 `@step` 裝飾的步驟
<a name="pipelines-step-decorator-condition"></a>

Pipelines 支援 `ConditionStep` 類別，其會評估先前步驟的結果，以決定要在管道中採取的動作。您也可以使用 `ConditionStep` 搭配 `@step` 裝飾的步驟。若要使用任何 `@step` 裝飾步驟的輸出搭配 `ConditionStep`，請輸入該步驟的輸出做為 `ConditionStep` 的引數。在下列範例中，條件步驟會收到 `@step` 裝飾的模型評估步驟的輸出。

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## 使用步驟的 `DelayedReturn` 輸出定義管道
<a name="pipelines-step-define-delayed"></a>

無論您是否使用 `@step` 裝飾項目，您都會以相同的方式定義管道。將 `DelayedReturn` 執行個體傳遞至您的管道時，您不需要傳遞完整步驟清單以建置管道。SDK 會根據您定義的相依性，自動推斷先前的步驟。管道圖中包含您傳遞至管道的 `Step` 物件或 `DelayedReturn` 物件的所有先前步驟。在下列範例中，管道會收到 `train` 函式的 `DelayedReturn` 物件。SageMaker AI 會將 `preprocess` 步驟做為 `train` 的前一個步驟傳遞至管道圖。

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

如果步驟之間沒有資料或自訂相依性，而且您平行執行多個步驟，則管道圖具有多個分葉節點。將清單中的所有這些分葉節點傳遞至管道定義中的 `steps` 引數，如下列範例所示：

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

當管道執行時，這兩個步驟都會平行執行。

您只能將圖形的分葉節點傳遞至管道，因為分葉節點包含透過資料或自訂相依性定義之所有先前步驟的相關資訊。當其編譯管道時，SageMaker AI 也會推斷形成管道圖的所有後續步驟，並將每個步驟做為個別步驟新增至管道。

## 建立管道
<a name="pipelines-step-decorator-pipeline-create"></a>

呼叫 `pipeline.create()` 來建立管道，如下列程式碼片段所示。如需 `create()` 的詳細資訊，請參閱 [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create)。

```
role = "pipeline-role"
pipeline.create(role)
```

當您呼叫 `pipeline.create()` 時，SageMaker AI 會編譯所有定義為管道執行個體一部分的步驟。SageMaker AI 會將序列化函式、引數和所有其他步驟相關成品上傳至 Amazon S3。

根據下列結構，資料位於 S3 儲存貯體中：

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri` 定義在 SageMaker AI 組態檔案中，並套用至整個管道。如果未定義，則會使用預設的 SageMaker AI 儲存貯體。

**注意**  
每次 SageMaker AI 編譯管道時，SageMaker AI 都會將步驟的序列化函式、引數和相依性儲存在以目前時間加上時間戳記的資料夾中。每次執行 `pipeline.create()`、`pipeline.update()`、`pipeline.upsert()` 或 `pipeline.definition()` 時都會發生這種情況。