本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 @step 裝飾的函式建立管道
您可以建立一個管道,方法是使用 @step 裝飾項目將 Python 函式轉換為管道步驟、在這些函式之間建立相依性以建立管道圖形 (或有向無環圖 (DAG)),然後將該圖形的分葉節點做為步驟清單傳遞至管道。下列各節會使用範例詳細說明此程序。
將函式轉換為步驟
若要使用 @step 裝飾項目建立步驟,請使用 @step 註釋函式。下列範例顯示預先處理資料的 @step 裝飾函式。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
當您調用 @step 裝飾函式時,SageMaker AI 會傳回 DelayedReturn 執行個體,而不是執行該函式。DelayedReturn 執行個體是該函式實際傳回的 Proxy。DelayedReturn 執行個體可以做為引數傳遞至另一個函式,或直接做為步驟傳遞至管道執行個體。如需 DelayedReturn 類別的相關資訊,請參閱 sagemaker.workflow.function_step.DelayedReturn
在步驟之間建立相依性
當您在兩個步驟之間建立相依性時,您可以在管道圖中的步驟之間建立連線。下列各節介紹在管道步驟之間建立相依性的多種方式。
透過輸入引數的資料相依性
將某個函式的 DelayedReturn 輸出做為輸入傳遞至另一個函式,會自動在管道 DAG 中建立資料相依性。在下列範例中,將 preprocess 函式的 DelayedReturn 輸出傳遞至 train 函式,會在 preprocess 與 train 之間建立相依性。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
前一個範例定義了使用 @step 裝飾的訓練函式。調用此函式時,其會接收預先處理管道步驟的 DelayedReturn 輸出做為輸入。調用訓練函式會傳回另一個 DelayedReturn。執行個體。此執行個體會保留形成管道 DAG 之函式 (即此範例中 preprocess 的步驟) 中定義之所有先前步驟的相關資訊。
在前一個範例中,preprocess 函式會傳回單一值。如需清單或元組等更複雜的傳回類型,請參閱限制。
定義自訂相依性
在前一個範例中,train 函式收到 preprocess 的 DelayedReturn 輸出並建立了相依性。如果您想要明確定義相依性,而不傳遞前一個步驟輸出,請使用 add_depends_on 函式搭配步驟。您可以使用 get_step() 函式,從其 DelayedReturn 執行個體擷取基礎步驟,然後使用相依性做為輸入來呼叫 add_depends_on_on。若要檢視 get_step() 函式定義,請參閱 sagemaker.workflow.step_outputs.get_stepget_step() 和 add_depends_on() 建立 preprocess 與 train 之間的相依性。
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
將資料從 @step 裝飾函式傳遞至傳統管道步驟,或從中將資料傳遞至該函式
您可以建立一個管道,其中包含 @step 裝飾步驟和傳統管道步驟,以及在它們之間傳遞資料。例如,您可以使用 ProcessingStep 來處理資料,並將其結果傳遞至 @step 裝飾的訓練函式。在下列範例中,@step 裝飾的訓練步驟會參考處理步驟的輸出。
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a@step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
使用 ConditionStep 搭配 @step 裝飾的步驟
Pipelines 支援 ConditionStep 類別,其會評估先前步驟的結果,以決定要在管道中採取的動作。您也可以使用 ConditionStep 搭配 @step 裝飾的步驟。若要使用任何 @step 裝飾步驟的輸出搭配 ConditionStep,請輸入該步驟的輸出做為 ConditionStep 的引數。在下列範例中,條件步驟會收到 @step 裝飾的模型評估步驟的輸出。
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
使用步驟的 DelayedReturn 輸出定義管道
無論您是否使用 @step 裝飾項目,您都會以相同的方式定義管道。將 DelayedReturn 執行個體傳遞至您的管道時,您不需要傳遞完整步驟清單以建置管道。SDK 會根據您定義的相依性,自動推斷先前的步驟。管道圖中包含您傳遞至管道的 Step 物件或 DelayedReturn 物件的所有先前步驟。在下列範例中,管道會收到 train 函式的 DelayedReturn 物件。SageMaker AI 會將 preprocess 步驟做為 train 的前一個步驟傳遞至管道圖。
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )
如果步驟之間沒有資料或自訂相依性,而且您平行執行多個步驟,則管道圖具有多個分葉節點。將清單中的所有這些分葉節點傳遞至管道定義中的 steps 引數,如下列範例所示:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )
當管道執行時,這兩個步驟都會平行執行。
您只能將圖形的分葉節點傳遞至管道,因為分葉節點包含透過資料或自訂相依性定義之所有先前步驟的相關資訊。當其編譯管道時,SageMaker AI 也會推斷形成管道圖的所有後續步驟,並將每個步驟做為個別步驟新增至管道。
建立管道
呼叫 pipeline.create() 來建立管道,如下列程式碼片段所示。如需 create() 的詳細資訊,請參閱 sagemaker.workflow.pipeline.Pipeline.create
role = "pipeline-role" pipeline.create(role)
當您呼叫 pipeline.create() 時,SageMaker AI 會編譯所有定義為管道執行個體一部分的步驟。SageMaker AI 會將序列化函式、引數和所有其他步驟相關成品上傳至 Amazon S3。
根據下列結構,資料位於 S3 儲存貯體中:
s3_root_uri/pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name/timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id/step_name/ results # returned output from the serialized function including the model
s3_root_uri 定義在 SageMaker AI 組態檔案中,並套用至整個管道。如果未定義,則會使用預設的 SageMaker AI 儲存貯體。
注意
每次 SageMaker AI 編譯管道時,SageMaker AI 都會將步驟的序列化函式、引數和相依性儲存在以目前時間加上時間戳記的資料夾中。每次執行 pipeline.create()、pipeline.update()、pipeline.upsert() 或 pipeline.definition() 時都會發生這種情況。