

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# `@step`으로 데코레이션된 함수를 사용하여 파이프라인 만들기
<a name="pipelines-step-decorator-create-pipeline"></a>

`@step` 데코레이터를 사용하여 Python 함수를 파이프라인 단계로 변환하고, 이러한 함수 간에 종속성을 만들어 파이프라인 그래프(또는 방향성 비순환 그래프(DAG))를 만들고, 해당 그래프의 리프 노드를 파이프라인에 단계 목록으로 전달하여 파이프라인을 만들 수 있습니다. 다음 섹션에서는 예시와 함께 이 절차에 대해 자세히 설명합니다.

**Topics**
+ [함수를 단계로 변환](#pipelines-step-decorator-run-pipeline-convert)
+ [단계 간 종속성 만들기](#pipelines-step-decorator-run-pipeline-link)
+ [`@step`으로 데코레이션된 단계와 함께 `ConditionStep` 사용](#pipelines-step-decorator-condition)
+ [단계의 `DelayedReturn` 출력을 사용하여 파이프라인 정의](#pipelines-step-define-delayed)
+ [파이프라인 생성](#pipelines-step-decorator-pipeline-create)

## 함수를 단계로 변환
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

`@step` 데코레이터를 사용하여 단계를 만들려면 함수에 `@step`을 주석으로 지정합니다. 다음 예시에서는 데이터를 사전 처리하는 `@step`으로 데코레이션된 함수를 보여줍니다.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

`@step`으로 데코레이션된 함수를 간접 호출하면 SageMaker AI는 함수를 실행하는 대신 `DelayedReturn` 인스턴스를 반환합니다. `DelayedReturn` 인스턴스는 해당 함수의 실제 반환에 대한 프록시입니다. `DelayedReturn` 인스턴스는 인수로 다른 함수에 전달하거나 파이프라인 인스턴스에 단계로 직접 전달할 수 있습니다. `DelayedReturn` 클래스에 대한 자세한 내용은 [sagemaker.workflow.function\$1step.DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn)을 참조하세요.

## 단계 간 종속성 만들기
<a name="pipelines-step-decorator-run-pipeline-link"></a>

두 단계 간에 종속성을 만들 때 파이프라인 그래프의 단계 간에 연결을 만듭니다. 다음 섹션에서는 파이프라인 단계 간에 종속성을 만들 수 있는 여러 방법을 소개합니다.

### 입력 인수를 통한 데이터 종속성
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

한 함수의 `DelayedReturn` 출력을 다른 함수에 대한 입력으로 전달하면 파이프라인 DAG에 데이터 종속성이 자동으로 만들어집니다. 다음 예시에서는 `preprocess` 함수의 `DelayedReturn` 출력을 `train` 함수에 전달하면 `preprocess`와 `train` 사이에 종속성이 만들어집니다.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

이전 예시에서는 `@step`으로 데코레이션된 훈련 함수를 정의합니다. 이 함수가 간접 호출되면 사전 처리 파이프라인 단계의 `DelayedReturn` 출력을 입력으로 수신합니다. 훈련 함수를 간접 호출하면 다른 `DelayedReturn` 인스턴스가 반환됩니다. 이 인스턴스는 파이프라인 DAG를 형성하는 해당 함수에 정의된 모든 이전 단계(예: 이 예시의 `preprocess` 단계)에 대한 정보를 보유합니다.

이전 예시에서 `preprocess` 함수는 단일 값을 반환합니다. 목록 또는 튜플과 같은 더 복잡한 반환 유형은 [제한 사항](pipelines-step-decorator-limit.md) 섹션을 참조하세요.

### 사용자 지정 종속성 정의
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

이전 예시에서 `train` 함수는 `preprocess`의 `DelayedReturn` 출력을 수신하고 종속성을 만들었습니다. 이전 단계 출력을 전달하지 않고 종속성을 명시적으로 정의하려면 단계와 함께 `add_depends_on` 함수를 사용합니다. `get_step()` 함수를 사용하여 `DelayedReturn` 인스턴스에서 기본 단계를 검색한 다음, 종속성을 입력으로 사용하여 `add_depends_on`\$1on을 직접 호출할 수 있습니다. `get_step()` 함수 정의를 보려면 [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step)을 참조하세요. 다음 예시에서는 `get_step()` 및 `add_depends_on()`을 사용하여 `preprocess` 및 `train` 간의 종속성을 만드는 방법을 보여줍니다.

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### `@step`으로 데코레이션된 함수에서 기존 파이프라인 단계로 데이터를 주고 받습니다.
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

`@step`으로 데코레이션된 단계와 기존 파이프라인 단계가 포함된 파이프라인을 만들고 이들 간에 데이터를 전달할 수 있습니다. 예를 들어 `ProcessingStep`을 사용하여 데이터를 처리하고 결과를 `@step`으로 데코레이션된 훈련 함수에 전달할 수 있습니다. 다음 예시에서는 `@step`으로 데코레이션된 훈련 단계가 처리 단계의 출력을 참조합니다.

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## `@step`으로 데코레이션된 단계와 함께 `ConditionStep` 사용
<a name="pipelines-step-decorator-condition"></a>

Pipelines은 파이프라인에서 수행할 작업을 결정하기 위해 이전 단계의 결과를 평가하는 `ConditionStep` 클래스를 지원합니다. `@step`으로 데코레이션된 단계와 함께 `ConditionStep`을 사용할 수도 있습니다. `ConditionStep`으로 데코레이션된 단계와 함께 `@step`을 사용하려면 해당 단계의 출력을 `ConditionStep`에 대한 인수로 입력합니다. 다음 예시에서는 조건 단계가 `@step`으로 데코레이션된 모델 평가 단계의 출력을 수신합니다.

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## 단계의 `DelayedReturn` 출력을 사용하여 파이프라인 정의
<a name="pipelines-step-define-delayed"></a>

`@step` 데코레이터 사용 여부와 관계없이 파이프라인을 동일한 방식으로 정의합니다. `DelayedReturn` 인스턴스를 파이프라인에 전달할 때 파이프라인을 빌드하기 위해 전체 단계 목록을 전달할 필요가 없습니다. SDK는 사용자가 정의한 종속성을 기반으로 이전 단계를 자동으로 추론합니다. 파이프라인에 전달한 `Step` 객체 또는 `DelayedReturn` 객체의 이전 단계는 모두 파이프라인 그래프에 포함됩니다. 다음 예시에서는 파이프라인이 `train` 함수에 대한 `DelayedReturn` 객체를 수신합니다. SageMaker AI는 파이프라인 그래프에 `train` 이전 단계로 `preprocess` 단계를 추가합니다.

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

단계 간에 데이터 또는 사용자 지정 종속성이 없고 여러 단계를 병렬로 실행하는 경우 파이프라인 그래프에 둘 이상의 리프 노드가 있습니다. 다음 예시와 같이 목록에 있는 이러한 모든 리프 노드를 파이프라인 정의의 `steps` 인수에 전달합니다.

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

파이프라인이 실행되면 두 단계가 병렬로 실행됩니다.

리프 노드에는 데이터 또는 사용자 지정 종속성을 통해 정의된 모든 이전 단계에 대한 정보가 포함되어 있으므로 그래프의 리프 노드만 파이프라인에 전달합니다. 파이프라인을 컴파일할 때 SageMaker AI는 파이프라인 그래프를 구성하는 모든 후속 단계를 추론하고 각 단계를 파이프라인에 별도의 단계로 추가합니다.

## 파이프라인 생성
<a name="pipelines-step-decorator-pipeline-create"></a>

다음 코드 조각과 같이 `pipeline.create()`를 직접 호출하여 파이프라인을 만듭니다. `create()`에 대한 자세한 내용은 [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create)를 참조하세요.

```
role = "pipeline-role"
pipeline.create(role)
```

`pipeline.create()`를 직접적으로 호출하면 SageMaker AI는 파이프라인 인스턴스의 일부로 정의된 모든 단계를 컴파일합니다. SageMaker AI는 직렬화된 함수, 인수 및 기타 모든 단계 관련 아티팩트를 Amazon S3에 업로드합니다.

데이터는 다음 구조에 따라 S3 버킷에 상주합니다.

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri`는 SageMaker AI 구성 파일에 정의되어 있으며 전체 파이프라인에 적용됩니다. 정의되지 않은 경우 기본 SageMaker AI 버킷이 사용됩니다.

**참고**  
SageMaker AI가 파이프라인을 컴파일할 때마다 SageMaker AI는 타임스탬프가 현재 시간인 폴더에 단계의 직렬화된 함수, 인수 및 종속성을 저장합니다. 이는 `pipeline.create()`, `pipeline.update()`, `pipeline.upsert()` 또는 `pipeline.definition()`을 실행할 때마다 발생합니다.