

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 调用远程函数
<a name="train-remote-decorator-invocation"></a>

要在 @remote 装饰器中调用函数，请使用以下任一方法：
+ [使用 @remote 装饰器调用函数](#train-remote-decorator-invocation-decorator).
+ [使用 `RemoteExecutor` API 调用函数](#train-remote-decorator-invocation-api).

如果您使用 @remote 装饰器方法调用函数，则训练作业将等待函数完成后再开始新任务。但是，如果您使用 `RemoteExecutor` API，则可以并行运行多个作业。以下部分说明了这两种调用函数的方式。

## 使用 @remote 装饰器调用函数
<a name="train-remote-decorator-invocation-decorator"></a>

你可以使用 @remote 装饰器来注释一个函数。 SageMaker AI 会将装饰器内部的代码转换为 SageMaker 训练作业。之后，训练作业将在装饰器内部调用该函数并等待作业完成。以下代码示例演示如何导入所需的库、启动 A SageMaker I 实例以及如何使用 @remote 装饰器对矩阵乘法进行注释。

```
from sagemaker.remote_function import remote
import numpy as np

@remote(instance_type="{{ml.m5.large}}")
def matrix_multiply(a, b):
    return np.matmul(a, b)
    
a = np.array([[1, 0],
             [0, 1]])
b = np.array([1, 2])

assert (matrix_multiply(a, b) == np.array([1,2])).all()
```

装饰器的定义如下所示。

```
def remote(
    *,
    **kwarg):
        ...
```

当您调用装饰函数时， SageMaker Python SDK 会将错误引发的所有异常加载到本地内存中。在以下代码示例中，对 divide 函数的首次调用成功完成，并将结果加载到本地内存中。在第二次调用 divide 函数时，代码返回一个错误，并将该错误加载到本地内存中。

```
from sagemaker.remote_function import remote
import pytest

@remote()
def divide(a, b):
    return a/b

# the underlying job is completed successfully 
# and the function return is loaded
assert divide(10, 5) == 2

# the underlying job fails with "AlgorithmError" 
# and the function exception is loaded into local memory 
with pytest.raises(ZeroDivisionError):
    divide(10, 0)
```

**注意**  
装饰函数作为远程作业运行。如果线程中断，则底层作业将不会停止。

### 如何更改局部变量的值
<a name="train-remote-decorator-invocation-decorator-value"></a>

装饰器函数在远程计算机上运行。在装饰函数中更改非局部变量或输入参数将不会更改本地值。

在以下代码示例中，列表和字典已附加到装饰器函数中。调用装饰器函数时，此情况不会改变。

```
a = []

@remote
def func():
    a.append(1)

# when func is invoked, a in the local memory is not modified        
func() 
func()

# a stays as []
    
a = {}
@remote
def func(a):
    # append new values to the input dictionary
    a["key-2"] = "value-2"
    
a = {"key": "value"}
func(a)

# a stays as {"key": "value"}
```

要更改在装饰器函数内部声明的局部变量的值，请从函数返回该变量。以下代码示例说明，在从函数返回局部变量时，该变量的值会发生变化。

```
a = {"key-1": "value-1"}

@remote
def func(a):
    a["key-2"] = "value-2"
    return a

a = func(a)

-> {"key-1": "value-1", "key-2": "value-2"}
```

### 数据序列化和反序列化
<a name="train-remote-decorator-invocation-input-output"></a>

当您调用远程函数时， SageMaker AI 会在输入和输出阶段自动序列化您的函数参数。使用 c [loud](https://github.com/cloudpipe/cloudpickle) pickle 对函数参数和返回值进行序列化。 SageMaker AI 支持序列化以下 Python 对象和函数。
+ 内置 Python 对象，包括字典、列表、浮点数、整数、字符串、布尔值和元组
+ Numpy 数组
+ Pandas Dataframes
+ Scikit-learn 数据集和估算器
+ PyTorch 模型
+ TensorFlow 模型
+ 的助推器等级 XGBoost

可使用以下各项，但有一些限制。
+ Dask DataFrames
+  XGBoost Dmatrix 类
+ TensorFlow 数据集和子类
+ PyTorch 模型

以下部分包含使用先前 Python 类的最佳实践，但远程函数存在一些限制，以及有关 SageMaker AI 将序列化数据存储在何处以及如何管理其访问权限的信息。

#### 有关能够有限地支持远程数据序列化的 Python 类的最佳实践
<a name="train-remote-decorator-invocation-input-output-bestprac"></a>

您可以使用此部分中列出的 Python 类，但有一些限制。后续部分将讨论有关如何使用以下 Python 类的最佳实践。
+ [Dask](https://www.dask.org/) DataFrames
+ 这 XGBoost DMatric 堂课
+ TensorFlow 数据集和子类
+ PyTorch 模型

##### 适用于 Dask 的最佳实践
<a name="train-remote-decorator-invocation-input-output-bestprac-dask"></a>

[Dask](https://www.dask.org/) 是一个用于 Python 中的并行计算的开源库。此部分说明了以下内容。
+ 如何 DataFrame 将 Dask 传递给你的远程函数
+ 如何将汇总统计数据从 Dask DataFrame 转换为 Pandas DataFrame

##### 如何 DataFrame 将 Dask 传递给你的远程函数
<a name="train-remote-decorator-invocation-input-output-bestprac-dask-pass"></a>

[Dask DataFrames](https://docs.dask.org/en/latest/dataframe.html) 通常用于处理大型数据集，因为它们可以容纳需要比可用内存更多的数据集。这是因为 Dask DataFrame 不会将您的本地数据加载到内存中。如果您将 Dask DataFrame 作为函数参数传递给远程函数，Dask 可能会传递对本地磁盘或云存储中数据的引用，而不是数据本身。以下代码显示了在远程函数中传递一个 Dask DataFrame 的示例，该函数将在空 DataFrame函数上运行。

```
#Do not pass a Dask DataFrame  to your remote function as follows
def clean(df: dask.DataFrame ):
    cleaned = df[] \ ...
```

只有当你使用时，Dask 才会将 Dask 中的数据加载 DataFrame 到内存中。 DataFrame 如果要在远程函数中使用 Dask DataFrame ，请提供数据的路径。之后，Dask 将直接从您在代码运行时指定的数据路径中读取数据集。

以下代码示例显示了如何在远程函数`clean`中使用 Dask DataFrame。在代码示例中，`raw_data_path`传递给 clean 而不是 Dask DataFrame。在代码运行时，直接从 `raw_data_path` 中指定的 Amazon S3 存储桶的位置读取数据集。然后，该`persist`函数将数据集保存在内存中以方便后续`random_split`函数，并使用 Dask DataFrame API 函数将数据集写回 S3 存储桶中的输出数据路径。

```
import dask.dataframe as dd

@remote(
   instance_type='{{ml.m5.24xlarge}}',
   volume_size={{300}}, 
   keep_alive_period_in_seconds={{600}})
#pass the data path to your remote function rather than the Dask DataFrame  itself
def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]):
    df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame 
    cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\
        .drop(['column_b', 'column_c'], axis=1)\
        .persist() #keep the data in memory to facilitate the following random_split operation

    train_df, test_df = cleaned.random_split(split_ratio, random_state=10)

    train_df.to_parquet(os.path.join(output_data_path, 'train')
    test_df.to_parquet(os.path.join(output_data_path, 'test'))
    
clean("{{s3://amzn-s3-demo-bucket/raw/}}", "{{s3://amzn-s3-demo-bucket/cleaned/}}", split_ratio={{[0.7, 0.3]}})
```

##### 如何将汇总统计数据从 Dask DataFrame 转换为 Pandas DataFrame
<a name="train-remote-decorator-invocation-input-output-bestprac-dask-pd"></a>

 DataFrame 通过调用以下示例代码所示`compute`的方法， DataFrame 可以将来自 Dask 的汇总统计数据转换为 Pandas。在示例中，S3 存储桶包含一个无法放入内存或 Pandas 数据框的大型 Dask DataFrame 。在以下示例中，远程函数扫描数据集，并将 DataFrame包含输出统计信息的 Dask 返回`describe`到 Pandas DataFrame。

```
executor = RemoteExecutor(
    instance_type='{{ml.m5.24xlarge}}',
    volume_size={{300}}, 
    keep_alive_period_in_seconds={{600}})

future = executor.submit(lambda: dd.read_parquet("{{s3://amzn-s3-demo-bucket/raw/}}").describe().compute())

future.result()
```

##### XGBoost DMatric 课堂最佳实践
<a name="train-remote-decorator-invocation-input-output-bestprac-xgboost"></a>

DMatrix 是用于加载数据的内部数据结构。 XGBoost 不能为了在计算会话之间轻松移动而对 DMatrix 对象进行封存。直接传递 DMatrix 实例将失败，并显示为`SerializationError`。

##### 如何将数据对象传递给远程函数并使用它进行训练 XGBoost
<a name="train-remote-decorator-invocation-input-output-bestprac-xgboost-pass"></a>

要将 Pandas DataFrame 转换为 DMatrix 实例并在远程函数中使用它进行训练，请将其直接传递给远程函数，如以下代码示例所示。

```
import xgboost as xgb

@remote
def train(df, params):
    #Convert a pandas dataframe into a DMatrix DataFrame and use it for training
    dtrain = DMatrix(df) 
    return xgb.train(dtrain, params)
```

##### TensorFlow 数据集和子类的最佳实践
<a name="train-remote-decorator-invocation-input-output-bestprac-tf"></a>

TensorFlow 数据集和子类是训练期间 TensorFlow 用来加载数据的内部对象。 TensorFlow 不能为了在计算会话之间轻松移动而对数据集和子类进行封存。直接传递 Tensorflow 数据集或子类将失败，并显示 `SerializationError`。使用 Tensorflow I/O APIs 从存储中加载数据，如以下代码示例所示。

```
import tensorflow as tf
import tensorflow_io as tfio

@remote
def train(data_path: str, params):
    
    dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt"))
    ...
    
train("{{s3://amzn-s3-demo-bucket/data}}", {})
```

##### PyTorch 模型的最佳实践
<a name="train-remote-decorator-invocation-input-output-bestprac-pytorch"></a>

PyTorch 模型是可序列化的，可以在本地环境和远程函数之间传递。如果您的本地环境和远程环境具有不同的设备类型，例如（GPUs 和 CPUs），则无法将经过训练的模型返回到本地环境。例如，如果以下代码是在本地环境中开发的， GPUs 但未使用但在实例中运行 GPUs，则直接返回经过训练的模型将导致`DeserializationError`。

```
# Do not return a model trained on GPUs to a CPU-only environment as follows

@remote(instance_type='{{ml.g4dn.xlarge}}')
def train(...):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu") # a device without GPU capabilities
    
    model = Net().to(device)
    
    # train the model
    ...
    
    return model
    
model = train(...) #returns a DeserializationError if run on a device with GPU
```

要将在 GPU 环境中训练的模型返回到仅包含 CPU 功能的模型，请 I/O APIs直接使用该 PyTorch 模型，如下面的代码示例所示。

```
import s3fs

model_path = "{{s3://amzn-s3-demo-bucket/folder/}}"

@remote(instance_type='{{ml.g4dn.xlarge}}')
def train(...):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    
    model = Net().to(device)
    
    # train the model
    ...
    
    fs = s3fs.FileSystem()
    with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file:
        torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU)
    
train(...) #use the model to train on either CPUs or GPUs

model = Net()
fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file:
    model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))
```

#### SageMaker AI 存储序列化数据的位置
<a name="train-remote-decorator-invocation-input-output-storage"></a>

当您调用远程函数时， SageMaker AI 会在输入和输出阶段自动序列化您的函数参数和返回值。此序列化数据存储在 S3 存储桶的根目录下。您可以在配置文件中指定根目录 `<s3_root_uri>`。这将自动为您生成参数 `job_name`。

在根目录下， SageMaker AI 会创建一个`<job_name>`文件夹，其中包含您当前的工作目录、序列化函数、序列化函数的参数、结果以及调用序列化函数时出现的任何异常。

在 `<job_name>` 下，目录 `workdir` 包含当前工作目录的压缩存档。压缩存档包括工作目录中的所有 Python 文件和 `requirements.txt` 文件，该文件指定运行 Remote 函数所需的任何依赖项。

以下是您在配置文件中指定的 S3 存储桶下的文件夹结构示例。

```
{{<s3_root_uri>}}/ # specified by s3_root_uri or S3RootUri
    <job_name>/ #automatically generated for you
        workdir/workspace.zip # archive of the current working directory (workdir)
        function/ # serialized function
        arguments/ # serialized function arguments
        results/ # returned output from the serialized function including the model
        exception/ # any exceptions from invoking the serialized function
```

您在 S3 存储桶中指定的根目录不适用于长期存储。序列化数据与序列化期间使用的 Python 版本和机器学习 (ML) 框架版本紧密相关。如果升级 Python 版本或机器学习框架，则可能无法使用序列化数据。可以改为执行以下操作。
+ 以与 Python 版本和机器学习框架无关的格式存储模型和模型构件。
+ 如果您升级 Python 或机器学习框架，请访问长期存储中的模型结果。

**重要**  
要在指定时长后删除序列化数据，请在 S3 存储桶上设置[生命周期配置](https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-set-lifecycle-configuration-intro.html)。

**注意**  
使用 Python [pickle](https://docs.python.org/3/library/pickle.html) 模块序列化的文件的可移植性可能低于其他数据格式（包括 CSV、Parquet 和 JSON）的可移植性。请小心加载来自未知来源的经过 pickle 处理的文件。

有关 Remote 函数的配置文件中应包含的内容的更多信息，请参阅[配置文件](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)。

#### 对序列化数据的访问权限
<a name="train-remote-decorator-invocation-input-output-access"></a>

管理员可以为序列化数据提供设置，包括其位置和配置文件中的任何加密设置。默认情况下，序列化数据使用 AWS Key Management Service (AWS KMS) 密钥加密。管理员也可以使用[存储桶策略](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-bucket-policies.html)限制对配置文件中指定的根目录的访问权限。可以跨项目和作业共享并使用配置文件。有关更多信息，请参阅[配置文件](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)。

## 使用 `RemoteExecutor` API 调用函数
<a name="train-remote-decorator-invocation-api"></a>

您可以使用 `RemoteExecutor` API 来调用函数。 SageMaker AI Python SDK 会将`RemoteExecutor`调用中的代码转换为 SageMaker 人工智能训练作业。之后，训练作业将以异步操作的形式调用该函数并返回 future。如果您使用 `RemoteExecutor` API，则可以并行运行多个训练作业。有关 Python 中的 future 的更多信息，请参阅 [Futures](https://docs.python.org/3/library/asyncio-future.html)。

以下代码示例演示如何导入所需的库、定义函数、启动 SageMaker AI 实例以及如何使用 API 提交并行运行`2`作业的请求。

```
from sagemaker.remote_function import RemoteExecutor

def matrix_multiply(a, b):
    return np.matmul(a, b)


a = np.array([[1, 0],
             [0, 1]])
b = np.array([1, 2])

with RemoteExecutor(max_parallel_job=2, instance_type="{{ml.m5.large}}") as e:
    future = e.submit(matrix_multiply, a, b)

assert (future.result() == np.array([1,2])).all()
```

`RemoteExecutor` 类是 [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html) 库的实现。

以下代码示例说明如何定义一个函数并使用 `RemoteExecutorAPI` 调用该函数。在此示例中，`RemoteExecutor` 将提交所有 `4` 作业，但仅并行提交 `2`。最后两个作业将重用集群，且开销最小。

```
from sagemaker.remote_function.client import RemoteExecutor

def divide(a, b):
    return a/b 

with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e:
    futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]]

for future in futures:
    print(future.result())
```

`max_parallel_job` 参数仅用作速率限制机制，而不会优化计算资源分配。在上一个代码示例中，在提交任何作业之前，`RemoteExecutor` 不会为两个并行作业预留计算资源。有关 @remote 装饰器的 `max_parallel_job` 或其他参数的更多信息，请参阅 [Remote 函数类和方法规范](https://sagemaker.readthedocs.io/en/stable/remote_function/sagemaker.remote_function.html)。

### `RemoteExecutor` API 的 Future 类
<a name="train-remote-decorator-invocation-api-future"></a>

Future 类是一个公共类，它表示异步调用训练作业时的返回函数。Future 类实现了 [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html) 类。此类可用于对底层作业进行操作并将数据加载到内存中。