

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 調用遠端函式
<a name="train-remote-decorator-invocation"></a>

若要在 @remote 裝飾項目內部調用函式，請採用下列其中一種方法：
+ [利用 @remote 裝飾項目調用函式](#train-remote-decorator-invocation-decorator).
+ [使用 `RemoteExecutor` API 來調用函式](#train-remote-decorator-invocation-api).

如果您使用 @remote 裝飾項目方法來調用函式，則訓練工作將等待函式完成，然後再開始新工作。然而，如果您使用 `RemoteExecutor` API，則可平行執行多項工作。以下區段展示調用函式的兩種方法。

## 利用 @remote 裝飾項目調用函式
<a name="train-remote-decorator-invocation-decorator"></a>

您可以透過 @remote 裝飾項目來註釋函式。SageMaker AI 會轉換裝飾項目內部的代碼為 SageMaker 訓練任務。然後，訓練工作會調用裝飾項目內部的函式，並等待工作完成。下列代碼範例顯示如何匯入所需的程式庫、如何啟動 SageMaker AI 執行個體，以及如何使用 @remote 裝飾項目註釋矩陣乘法。

```
from sagemaker.remote_function import remote
import numpy as np

@remote(instance_type="{{ml.m5.large}}")
def matrix_multiply(a, b):
    return np.matmul(a, b)
    
a = np.array([[1, 0],
             [0, 1]])
b = np.array([1, 2])

assert (matrix_multiply(a, b) == np.array([1,2])).all()
```

裝飾項目定義如下。

```
def remote(
    *,
    **kwarg):
        ...
```

當您調用裝飾函式時，SageMaker Python SDK 會將由錯誤引發的任何例外狀況載入本機記憶體。下列代碼範例成功完成第一次呼叫除法函式，並將結果載入本機記憶體。在第二次呼叫除法函式時，代碼傳回錯誤，並將此錯誤載入本機記憶體。

```
from sagemaker.remote_function import remote
import pytest

@remote()
def divide(a, b):
    return a/b

# the underlying job is completed successfully 
# and the function return is loaded
assert divide(10, 5) == 2

# the underlying job fails with "AlgorithmError" 
# and the function exception is loaded into local memory 
with pytest.raises(ZeroDivisionError):
    divide(10, 0)
```

**注意**  
裝飾的函式以遠端工作方式執行。如執行緒被中斷，基礎工作將不會停止。

### 如何變更本機變數的值
<a name="train-remote-decorator-invocation-decorator-value"></a>

透過遠端機器執行裝飾項目函式。變更裝飾函式內部的非本機變數或輸入引數不會變更本機值。

在下列代碼範例，清單與字典會附加於裝飾項目函式內部。當調用裝飾項目函式時，這點不會變更。

```
a = []

@remote
def func():
    a.append(1)

# when func is invoked, a in the local memory is not modified        
func() 
func()

# a stays as []
    
a = {}
@remote
def func(a):
    # append new values to the input dictionary
    a["key-2"] = "value-2"
    
a = {"key": "value"}
func(a)

# a stays as {"key": "value"}
```

若要變更在裝飾項目函式內部宣告的本機變數值，請從函式傳回該變數。下列代碼範例示範當從函式傳回本機變數時，會變更其值。

```
a = {"key-1": "value-1"}

@remote
def func(a):
    a["key-2"] = "value-2"
    return a

a = func(a)

-> {"key-1": "value-1", "key-2": "value-2"}
```

### 資料序列化及還原序列化
<a name="train-remote-decorator-invocation-input-output"></a>

當您調用遠端函式時，SageMaker AI 會在輸入與輸出階段自動序列化函式引數。函式引數與傳回會利用 [cloudpickle](https://github.com/cloudpipe/cloudpickle) 來序列化。SageMaker AI 支援序列化下列 Python 物件與函式。
+ 內建 Python 物件，包含字典、清單、浮點數、ints、字串、布林值以及元組
+ Numpy 陣列
+ Pandas Dataframes
+ Scikit-learn 資料集與估算器
+ PyTorch 模型
+ TensorFlow 模型
+ XGBoost 的提升類別

以下內容可於部分限制下使用。
+ Dask DataFrames
+ XGBoost Dmatrix 類別
+ TensorFlow 資料集與子類別
+ PyTorch 模型

下個區段包含運用先前 Python 類別的最佳實務 (針對遠端函式有部分限制)、SageMaker AI 儲存序列化資料的位置，以及如何管理其存取權的資訊。

#### Python 類別的最佳實務 (針對遠端資料序列化提供有限支援)
<a name="train-remote-decorator-invocation-input-output-bestprac"></a>

您可以在有限制的情況使用本區段所列的 Python 類別。下個區段將討論使用下列 Python 類別的最佳實務。
+ [Dask](https://www.dask.org/) DataFrames
+ XGBoost DMatric 類別
+ TensorFlow 資料集與子類別
+ PyTorch 模型

##### Dask 最佳實務
<a name="train-remote-decorator-invocation-input-output-bestprac-dask"></a>

[Dask](https://www.dask.org/) 是開放原始碼程式庫，可用於 Python 平行運算。本區段顯示以下內容。
+ 如何傳遞 Dask DataFrame 至遠端函式
+ 如何將總結統計資料從 Dask DataFrame 轉換為 Pandas DataFrame

##### 如何傳遞 Dask DataFrame 至遠端函式
<a name="train-remote-decorator-invocation-input-output-bestprac-dask-pass"></a>

[Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html) 通常用於處理大型資料集，因為即使資料集所需的記憶體比可用量更多，其仍可容納。這是因為 Dask DataFrame 不會將本機資料載入記憶體。如您將 Dask DataFrame 作為函式引數傳遞給遠端函式，Dask 可能會傳遞區域磁碟或雲端儲存空間中資料的參考，而不是資料本身。以下代碼範例顯示在遠端函式內部傳遞 Dask DataFrame，該函式將在空白 DataFrame 上操作。

```
#Do not pass a Dask DataFrame  to your remote function as follows
def clean(df: dask.DataFrame ):
    cleaned = df[] \ ...
```

僅當您使用 DataFrame 時，Dask 才會將資料從 Dask DataFrame 載入記憶體。若要在遠端函式內部使用 Dask DataFrame，請提供資料路徑。然後，Dask 將於執行代碼時，直接從您指定的資料路徑讀取資料集。

下列代碼範例示範如何在遠端函式 `clean` 內部使用 Dask DataFrame。在此代碼範例，`raw_data_path` 傳遞給清理，而非 Dask DataFrame。當代碼執行時，會從 `raw_data_path` 指定的 Amazon S3 儲存貯體位置直接讀取資料集。然後，`persist` 函式會將資料集保留於記憶體，以便執行後續 `random_split` 函式，並利用 Dask DataFrame API 函式寫回 S3 儲存貯體的輸出資料路徑。

```
import dask.dataframe as dd

@remote(
   instance_type='{{ml.m5.24xlarge}}',
   volume_size={{300}}, 
   keep_alive_period_in_seconds={{600}})
#pass the data path to your remote function rather than the Dask DataFrame  itself
def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]):
    df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame 
    cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\
        .drop(['column_b', 'column_c'], axis=1)\
        .persist() #keep the data in memory to facilitate the following random_split operation

    train_df, test_df = cleaned.random_split(split_ratio, random_state=10)

    train_df.to_parquet(os.path.join(output_data_path, 'train')
    test_df.to_parquet(os.path.join(output_data_path, 'test'))
    
clean("{{s3://amzn-s3-demo-bucket/raw/}}", "{{s3://amzn-s3-demo-bucket/cleaned/}}", split_ratio={{[0.7, 0.3]}})
```

##### 如何將總結統計資料從 Dask DataFrame 轉換為 Pandas DataFrame
<a name="train-remote-decorator-invocation-input-output-bestprac-dask-pd"></a>

Dask DataFrame 的總結統計資料可透過調用 `compute` 方法轉換為 Pandas DataFrame，如以下範例代碼所示。在此範例，S3 儲存貯體包含大型 Dask DataFrame，且其無法放入記憶體或 Pandas dataframe。在以下範例，遠端函式掃描資料集，並傳回 Dask DataFrame (其中包含來自 `describe` 的輸出統計資料) 至 Pandas DataFrame。

```
executor = RemoteExecutor(
    instance_type='{{ml.m5.24xlarge}}',
    volume_size={{300}}, 
    keep_alive_period_in_seconds={{600}})

future = executor.submit(lambda: dd.read_parquet("{{s3://amzn-s3-demo-bucket/raw/}}").describe().compute())

future.result()
```

##### XGBoost DMatric 類別最佳實務
<a name="train-remote-decorator-invocation-input-output-bestprac-xgboost"></a>

DMatrix 是 XGBoost 用於載入資料的內部資料結構。為在運算工作階段之間輕鬆移動，無法保存 DMatrix 物件。直接傳遞 DMatrix 執行個體會失敗，並顯示 `SerializationError`。

##### 如何傳遞資料物件至遠端函式，並使用 XGBoost 進行訓練
<a name="train-remote-decorator-invocation-input-output-bestprac-xgboost-pass"></a>

若要轉換 Pandas DataFrame 為 DMatrix 執行個體，並將其用於遠端函式訓練，請將其直接傳遞至遠端函式，如下列代碼範例所示。

```
import xgboost as xgb

@remote
def train(df, params):
    #Convert a pandas dataframe into a DMatrix DataFrame and use it for training
    dtrain = DMatrix(df) 
    return xgb.train(dtrain, params)
```

##### TensorFlow 資料集與子類別最佳實務
<a name="train-remote-decorator-invocation-input-output-bestprac-tf"></a>

TensorFlow 資料集與子類別是內部物件，由 TensorFlow 在訓練期間用於載入資料。為在運算工作階段之間輕鬆移動，無法保存 TensorFlow 資料集與子類別。直接傳遞 Tensorflow 資料集或子類別會失敗，並顯示 `SerializationError`。使用 Tensorflow I/O API 從儲存載入資料，如下列代碼範例所示。

```
import tensorflow as tf
import tensorflow_io as tfio

@remote
def train(data_path: str, params):
    
    dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt"))
    ...
    
train("{{s3://amzn-s3-demo-bucket/data}}", {})
```

##### PyTorch 模型最佳實務
<a name="train-remote-decorator-invocation-input-output-bestprac-pytorch"></a>

PyTorch 模型可序列化，且可在本機環境與遠端函式之間傳遞。如本機環境與遠端環境採用不同裝置類型，例如 (GPU 與 CPU)，則無法將訓練過的模型傳回本機環境。例如，若下列代碼在無 GPU 的本機環境進行開發，但在具 GPU 的執行個體執行，則直接傳回訓練過的模型會導致 `DeserializationError`。

```
# Do not return a model trained on GPUs to a CPU-only environment as follows

@remote(instance_type='{{ml.g4dn.xlarge}}')
def train(...):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu") # a device without GPU capabilities
    
    model = Net().to(device)
    
    # train the model
    ...
    
    return model
    
model = train(...) #returns a DeserializationError if run on a device with GPU
```

若要將在 GPU 環境訓練的模型傳回僅包含 CPU 功能的模型，請直接使用 PyTorch 模型 I/O API，如下列代碼範例所示。

```
import s3fs

model_path = "{{s3://amzn-s3-demo-bucket/folder/}}"

@remote(instance_type='{{ml.g4dn.xlarge}}')
def train(...):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    
    model = Net().to(device)
    
    # train the model
    ...
    
    fs = s3fs.FileSystem()
    with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file:
        torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU)
    
train(...) #use the model to train on either CPUs or GPUs

model = Net()
fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file:
    model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))
```

#### SageMaker AI 儲存序列化資料的位置
<a name="train-remote-decorator-invocation-input-output-storage"></a>

當您調用遠端函式時，SageMaker AI 會在輸入與輸出階段自動序列化函式引數並傳回值。此序列化資料會儲存於 S3 儲存貯體的根目錄。您可在組態檔案指定根目錄 `<s3_root_uri>`。系統會自動為您產生參數 `job_name`。

SageMaker AI 會在根目錄建立 `<job_name>` 資料夾，其中包含您目前的工作目錄、序列化函式、序列化函式的引數、結果，以及因調用序列化函式而產生的任何例外狀況。

在 `<job_name>` 下方，目錄 `workdir` 會包含目前工作目錄的已壓縮封存。已壓縮封存包含工作目錄與 `requirements.txt` 檔案的任何 Python 檔案，該檔案會指定執行遠端函式所需的任何相依性。

以下範例針對您在組態檔案指定的 S3 儲存貯體顯示其資料夾結構。

```
{{<s3_root_uri>}}/ # specified by s3_root_uri or S3RootUri
    <job_name>/ #automatically generated for you
        workdir/workspace.zip # archive of the current working directory (workdir)
        function/ # serialized function
        arguments/ # serialized function arguments
        results/ # returned output from the serialized function including the model
        exception/ # any exceptions from invoking the serialized function
```

您在 S3 儲存貯體指定的根目錄並不適用長期儲存。序列化資料與序列化期間所用的 Python 版本與機器學習 (ML) 架構版本緊密關聯。如您升級 Python 版本或機器學習 (ML) 架構，則可能無法使用序列化資料。相反地，請執行下列動作。
+ 以與 Python 版本與機器學習 (ML) 架構無關的格式儲存模型及模型成品。
+ 如您升級 Python 或機器學習 (ML) 架構，請從長期儲存存取模型結果。

**重要**  
若要在指定的時間量後刪除序列化資料，請在 S3 儲存貯體設定[存留期組態](https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-set-lifecycle-configuration-intro.html)。

**注意**  
相較於其他資料格式 (包含 CSV、Parquet 與 JSON)，使用 Python [保存](https://docs.python.org/3/library/pickle.html)模組進行序列化的檔案可攜性較低。當從未知來源載入保存檔案時，請小心。

如需更多資訊了解遠端函式組態檔案所應包含的內容，請參閱[組態檔案](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)。

#### 存取序列化資料
<a name="train-remote-decorator-invocation-input-output-access"></a>

管理員可為序列化資料提供設定，包含其位置及組態檔案的任何加密設定。根據預設，序列化資料會使用 AWS Key Management Service (AWS KMS) 金鑰加密。管理員也可利用[儲存貯體政策](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-bucket-policies.html)來限制存取您在組態檔案指定的根目錄。可在專案與工作之間共用及使用組態檔案。如需更多資訊，請參閱[組態檔案](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)。

## 使用 `RemoteExecutor` API 來調用函式
<a name="train-remote-decorator-invocation-api"></a>

您可以透過 `RemoteExecutor` 來調用函式。SageMaker AI Python SDK 會將 `RemoteExecutor` 呼叫內部的代碼轉換為 SageMaker AI 訓練任務。然後，訓練工作會調用該函式作為非同步操作，並傳回未來。如果您使用 `RemoteExecutor` API，則可平行執行多個訓練工作。有關 Python 未來的更多相關資訊，請參閱[未來](https://docs.python.org/3/library/asyncio-future.html)。

下列代碼範例顯示如何匯入必要的程式庫、定義函式、啟動 SageMaker AI 執行個體，以及使用 API 提交請求以便平行執行 `2` 任務。

```
from sagemaker.remote_function import RemoteExecutor

def matrix_multiply(a, b):
    return np.matmul(a, b)


a = np.array([[1, 0],
             [0, 1]])
b = np.array([1, 2])

with RemoteExecutor(max_parallel_job=2, instance_type="{{ml.m5.large}}") as e:
    future = e.submit(matrix_multiply, a, b)

assert (future.result() == np.array([1,2])).all()
```

`RemoteExecutor` 類別是 [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html) 程式庫的實作。

下列代碼範例示範如何定義函式並使用 `RemoteExecutorAPI` 來呼叫函式。在此範例，`RemoteExecutor` 將總共提交 `4` 項任務，但僅 `2` 個為平行處理。最後兩個任務將以最小額外負荷重複使用叢集。

```
from sagemaker.remote_function.client import RemoteExecutor

def divide(a, b):
    return a/b 

with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e:
    futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]]

for future in futures:
    print(future.result())
```

`max_parallel_job` 參數僅做為速率限制機制，而不會最佳化運算資源配置。在先前的代碼範例，在提交任何工作之前，`RemoteExecutor` 不會為兩個 平行工作保留運算資源。如需更多相關資訊了解 `max_parallel_job` 或 @remote 裝飾項目的其他參數，請參閱[遠端函式類別與方法規格](https://sagemaker.readthedocs.io/en/stable/remote_function/sagemaker.remote_function.html)。

### `RemoteExecutor` API 的未來類別
<a name="train-remote-decorator-invocation-api-future"></a>

未來類別是公有類別，代表訓練工作於非同步調用時的傳回函式。未來類別實作 [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html) 類別。此類可用來針對基礎工作進行操作，並載入資料至記憶體。