

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Processing 对 TB 级机器学习 SageMaker 数据集进行分布式特征工程
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets"></a>

*Chris Boomhower，Amazon Web Services*

## Summary
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-summary"></a>

许多 TB 级或更大的数据集通常由分层文件夹结构构成，数据集中的文件有时会共享相互依存关系。因此，机器学习 (ML) 工程师和数据科学家必须做出深思熟虑的决策，为模型训练和推理准备此类数据。此模式演示了如何将手动宏分片和微分片技术与 Amazon Processing 和虚拟 CPU (vCPU) 并行化相结合，为复杂的大数据 ML 数据集高效扩展功能工程 SageMaker 流程。 

这种模式将*宏分片*定义为在多台计算机上拆分数据目录进行处理，将*微分片*定义为将每台计算机上的数据分割至多个处理线程中。该模式通过使用 Amazon 和 [PhysioNet MIM](https://physionet.org/content/mimic3wdb/1.0/) IC-II SageMaker I 数据集中的时间序列波形记录样本来演示这些技术。通过采用这种模式技术，您可以最大限度地减少特征工程的处理时间和成本，同时最大限度地提高资源利用率和吞吐量效率。无论数据类型如何，这些优化都依赖于亚马逊弹性计算云 (Amazon EC2) 实例上的分布式 SageMaker 处理，CPUs 而对于类似的大型数据集，v 也是如此。

## 先决条件和限制
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-prereqs"></a>

**先决条件**
+ 如果您想为自己的数据集实现此模式，则可以访问 SageMaker 笔记本实例或 SageMaker Studio。如果您是首次使用亚马逊 SageMaker ，请参阅 AWS 文档 SageMaker中的[亚马逊入门](https://docs.aws.amazon.com/sagemaker/latest/dg/gs.html)。
+ SageMaker Studio，如果你想用 [PhysioNet MIMIC-III](https://physionet.org/content/mimic3wdb/1.0/) 样本数据实现这种模式。 
+ 该模式使用 SageMaker 处理，但不需要任何运行 SageMaker 处理作业的经验。

**限制**
+ 这种模式非常适合包含相互依赖文件的机器学习数据集。手动宏分片和并行运行多个单实例 Processing SageMaker 作业对这些相互依赖关系的益处最大。对于不存在此类相互依赖关系的数据集，Processing 中的`ShardedByS3Key` SageMaker 功能可能是宏分片的更好替代方案，因为它会将分片数据发送到由同一 Processing 作业管理的多个实例。但是，你可以在两种场景中实现这种模式的微分片策略，以最好地利用实例 v。CPUs

**产品版本**
+ 亚马逊 SageMaker Python 软件开发工具包版本 2

## 架构
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-architecture"></a>

**目标技术堆栈**
+ Amazon Simple Storage Service（Amazon S3）
+ Amazon SageMaker

**目标架构**

*宏分片和分布式实例 EC2 *

该架构中代表的 10 个并行进程反映了 MIMIC-III 数据集结构。（为了简化逻辑示意图，流程用省略号表示。） 当您使用手动宏分片时，类似架构适用于任何数据集。就 MIMIC-III 而言，您可以毫不费力地单独处理每个患者组文件夹，从而充分利用数据集的原始结构。下图中的记录组块出现在左侧 (1)。鉴于数据分布性质，按患者群体进行分片是有意义的。

![\[微分片和分布式实例的架构 EC2\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/images/pattern-img/e7a90b31-de8f-41fd-bb3f-c7c6100fc306/images/c19a8f87-ac59-458e-89cb-50be17ca4a0c.png)


但是，按患者组手动分片意味着每个患者组文件夹都需要单独的处理作业，如您在图 (2) 的中间部分所见，而不是具有多个 EC2 实例的单个处理作业。由于 MIMIC-III 的数据包括二进制波形文件和匹配的基于文本的头文件，并且需要依赖 [wfdb 库](https://wfdb.readthedocs.io/en/latest/)提取二进制数据，因此特定患者的所有记录都必须在同一个实例上可用。要确保每个二进制波形文件的关联头文件也存在，唯一的方法是实现手动分片，以在自己的处理作业中运行每个分片，并指定 `s3_data_distribution_type='FullyReplicated'` 何时定义处理作业输入。或者，如果所有数据都位于一个目录中，并且文件之间不存在依赖关系，则更合适的选择可能是启动具有多个 EC2 实例并`s3_data_distribution_type='ShardedByS3Key'`指定的单个处理作业。指定`ShardedByS3Key `为 Amazon S3 数据分配类型 SageMaker 可自动管理跨实例的数据分片。 

为每个文件夹启动 Processing 作业是经济高效的预处理数据的方式，因为同时运行多个实例可以节省时间。为进一步节省成本和时间，您可以在每个处理作业中使用微分片。 

*微分片和 parallel v CPUs*

在每个 Processing 作业中，对分组的数据进行进一步划分，以最大限度地利用 SageMaker 完全托管 EC2 实例CPUs 上的所有可用 v。图 (2) 中间部分的方块描述了每个主要处理任务中的情况。患者记录文件夹的内容是扁平化的，并根据实例上可用 v 的数量CPUs 进行平均分割。分割文件夹内容后，大小均匀的一组文件将分布在所有 v 中CPUs 进行处理。处理完成后，每个 vCPU 结果将合并到每个处理作业的单个数据文件中。 

在随附的代码中，这些概念在 `src/feature-engineering-pass1/preprocessing.py` 文件的下一节中进行了介绍。

```
def chunks(lst, n):
    """
    Yield successive n-sized chunks from lst.
    
    :param lst: list of elements to be divided
    :param n: number of elements per chunk
    :type lst: list
    :type n: int
    :return: generator comprising evenly sized chunks
    :rtype: class 'generator'
    """
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
 
 
# Generate list of data files on machine
data_dir = input_dir
d_subs = next(os.walk(os.path.join(data_dir, '.')))[1]
file_list = []
for ds in d_subs:
    file_list.extend(os.listdir(os.path.join(data_dir, ds, '.')))
dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']
 
# Split list of files into sub-lists
cpu_count = multiprocessing.cpu_count()
splits = int(len(dat_list) / cpu_count)
if splits == 0: splits = 1
dat_chunks = list(chunks(dat_list, splits))
 
# Parallelize processing of sub-lists across CPUs
ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
 
# Compile and pickle patient group dataframe
ws_df_group = pd.concat(ws_df_list)
ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'})
ws_df_group.to_json(os.path.join(output_dir, group_data_out))
```

`chunks` 函数首先被定义为使用给定列表，方法是将其分成大小均匀的长度 `n `，然后将这些结果作为生成器返回。接下来，通过编译所有存在的二进制波形文件列表，在患者文件夹中对数据进行扁平化。完成此操作后，将获得 EC2 实例上CPUs 可用的 v 数。CPUs [通过调用，将二进制波形文件列表平均分成这些 v`chunks`，然后使用 joblib 的 Parallel 类在自己的 vCPU 上处理每个波形子列表。](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)处理任务会自动将结果合并到一个数据帧列表中，然后处理任务会进一步处理， SageMaker 然后在任务完成后将其写入 Amazon S3。在此示例中，处理任务有 10 个文件写入 Amazon S3（每个任务一个）。

当所有初始处理任务完成后，辅助处理任务（如图 (3) 右侧的方块所示）将合并每个主处理任务生成的输出文件，并将合并后的输出写入 Amazon S3 (4)。

## 工具
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-tools"></a>

**工具**
+ [Python](https://www.python.org/) — 用于此模式的示例代码是 Python（版本 3）。
+ [SageMaker Stud](https://docs.aws.amazon.com/sagemaker/latest/dg/studio.html) io — Amazon SageMaker Studio 是一个用于机器学习的基于 Web 的集成开发环境 (IDE)，允许您构建、训练、调试、部署和监控您的机器学习模型。您可以在 Studio 中 SageMaker 使用 Jupyter 笔记本来运行 SageMaker 处理作业。
+ [SageMaker 处理](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) — Amazon P SageMaker rocessing 提供了一种运行数据处理工作负载的简化方法。在这种模式中，特征工程代码是通过使用 SageMaker 处理作业大规模实现的。

**代码**

随附的 .zip 文件提供了此模式的完整代码。以下部分介绍为此模式构建架构的步骤。附件中的示例代码介绍了每个步骤。

## 操作说明
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-epics"></a>

### 设置你的 SageMaker Studio 环境
<a name="set-up-your-sagemaker-studio-environment"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 访问亚马逊 SageMaker 工作室。 | 按照[亚马逊 SageMaker 文档](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html)中提供的说明使用您的 AWS 账户登录 SageMaker Studio。 | 数据科学家、机器学习工程师 | 
| 安装 wget 实用程序。 | 如果您已使用新的 SageMaker Studio 配置或以前从未在 Studio 中 SageMaker 使用过这些实用程序，请安装 *wg* et。 要进行安装，请在 SageMaker Studio 控制台中打开终端窗口并运行以下命令：<pre>sudo yum install wget</pre> | 数据科学家、机器学习工程师 | 
| 下载并解压缩示例代码。 | 在*附件*部分下载 `attachments.zip` 文件。在终端窗口，导航至下载文件并提取其内容的文件夹：<pre>unzip attachment.zip</pre>导航到提取 .zip 文件的文件夹，然后提取 `Scaled-Processing.zip` 文件的内容。<pre>unzip Scaled-Processing.zip</pre> | 数据科学家、机器学习工程师 | 
| 从 physionet.org 下载示例数据集，并将其上传到 Amazon S3。 | 在包含 `get_data.ipynb` 文件的文件夹中运行 Jupyter Notebook `Scaled-Processing`。此笔记本从 p [hysionet.org](https://physionet.org) 下载示例 MIMIC-III 数据集，然后将其上传到亚马逊 S3 中的 Studio 会话存储桶 SageMaker 。 | 数据科学家、机器学习工程师 | 

### 配置第一项预处理脚本
<a name="configure-the-first-preprocessing-script"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 扁平化所有子目录的文件层次结构。 | 在 MIMIC-III 等大型数据集，文件通常分布在多个子目录中，即使在逻辑父组中也是如此。您应将脚本配置为将所有子目录中的所有组文件扁平化，如下面的代码所示。<pre># Generate list of .dat files on machine<br />data_dir = input_dir<br />d_subs = next(os.walk(os.path.join(data_dir, '.')))[1]<br />file_list = []<br />for ds in d_subs:<br />    file_list.extend(os.listdir(os.path.join(data_dir, ds, '.')))<br />dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']</pre>    此操作说明中的示例代码片段来自于 `src/feature-engineering-pass1/preprocessing.py` 文件，附件中提供了此文件。 | 数据科学家、机器学习工程师 | 
| 根据 vCPU 数量将文件划分至子组。 | 应根据运行脚本的实例上CPUs 存在的 v 数，将文件分成大小均匀的子组或块。在此步骤中，可实现类似于以下代码的代码。<pre># Split list of files into sub-lists<br />cpu_count = multiprocessing.cpu_count()<br />splits = int(len(dat_list) / cpu_count)<br />if splits == 0: splits = 1<br />dat_chunks = list(chunks(dat_list, splits))</pre> | 数据科学家、机器学习工程师 | 
| 在 v 上并行处理子组 CPUs | 应将脚本逻辑配置为并行处理所有的子组。为此，请按如下方式使用 Joblib 库 的 `Parallel ` 类和 `delayed ` 方法。 <pre># Parallelize processing of sub-lists across CPUs<br />ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)</pre> | 数据科学家、机器学习工程师 | 
| 将单个文件组的输出保存至 Amazon S3。 | 并行 vCPU 处理完成后，应合并每个 vCPU 的结果，并将其上传到文件组的 S3 存储桶路径。在此步骤，可以使用类似于以下代码的代码。<pre># Compile and pickle patient group dataframe<br />ws_df_group = pd.concat(ws_df_list)<br />ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'})<br />ws_df_group.to_json(os.path.join(output_dir, group_data_out))</pre> | 数据科学家、机器学习工程师 | 

### 配置第二预处理脚本
<a name="configure-the-second-preprocessing-script"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 合并运行第一个脚本的所有 Processing 作业所生成的数据文件。 | 前面的脚本为每个 SageMaker 处理作业输出一个文件，该作业处理数据集中的一组文件。 接下来，您需要将这些输出文件合并至一个对象，并将单个输出数据集写入 Amazon S3。文件中对此进行了演示，此 `src/feature-engineering-pass1p5/preprocessing.py` 文件载于附件，如下所示。<pre>def write_parquet(wavs_df, path):<br />    """<br />    Write waveform summary dataframe to S3 in parquet format.<br />    <br />    :param wavs_df: waveform summary dataframe<br />    :param path: S3 directory prefix<br />    :type wavs_df: pandas dataframe<br />    :type path: str<br />    :return: None<br />    """<br />    extra_args = {"ServerSideEncryption": "aws:kms"}<br />    wr.s3.to_parquet(<br />        df=wavs_df,<br />        path=path,<br />        compression='snappy',<br />        s3_additional_kwargs=extra_args)<br /> <br /> <br />def combine_data():<br />    """<br />    Get combined data and write to parquet.<br />    <br />    :return: waveform summary dataframe<br />    :rtype: pandas dataframe<br />    """<br />    wavs_df = get_data()<br />    wavs_df = normalize_signal_names(wavs_df)<br />    write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))<br /> <br />    return wavs_df<br /> <br /> <br />wavs_df = combine_data()</pre> | 数据科学家、机器学习工程师 | 

### 运行处理作业
<a name="run-processing-jobs"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 运行第一项处理作业。 | 若要执行宏分片，请为每个文件组运行单独的处理作业。Microsharding 是在每个 Processing 作业中执行，因为每个作业都会运行您的第一个脚本。以下代码演示了如何为以下片段（包含在 `notebooks/FeatExtract_Pass1.ipynb`）中的每个文件组目录启动处理作业。<pre>pat_groups = list(range(30,40))<br />ts = str(int(time.time()))<br /> <br />for group in pat_groups:<br />    sklearn_processor = SKLearnProcessor(framework_version='0.20.0',<br />                                     role=role,<br />                                     instance_type='ml.m5.4xlarge',<br />                                     instance_count=1,<br />                                     volume_size_in_gb=5)<br />    sklearn_processor.run(<br />        code='../src/feature-engineering-pass1/preprocessing.py',<br />        job_name='-'.join(['scaled-processing-p1', str(group), ts]),<br />        arguments=[<br />            "input_path", "/opt/ml/processing/input",<br />            "output_path", "/opt/ml/processing/output",<br />            "group_data_out", "ws_df_group.json"<br />        ],<br />        inputs=<br />        [<br />            ProcessingInput(<br />                source=f's3://{sess.default_bucket()}/data_inputs/{group}',<br />                destination='/opt/ml/processing/input',<br />                s3_data_distribution_type='FullyReplicated'<br />            )<br />        ],<br />        outputs=<br />        [<br />            ProcessingOutput(<br />                source='/opt/ml/processing/output',<br />                destination=f's3://{sess.default_bucket()}/data_outputs/{group}'<br />            )<br />        ],<br />        wait=False<br />    )</pre> | 数据科学家、机器学习工程师 | 
| 运行第二个处理作业。 | 要合并第一组处理作业生成的输出并执行任何其他计算以进行预处理，请使用单 SageMaker 个 Processing 作业运行第二个脚本。以下代码演示了这一点（包含在`notebooks/FeatExtract_Pass1p5.ipynb`）。<pre>ts = str(int(time.time()))<br />bucket = sess.default_bucket()<br />     <br />sklearn_processor = SKLearnProcessor(framework_version='0.20.0',<br />                                 role=role,<br />                                 instance_type='ml.t3.2xlarge',<br />                                 instance_count=1,<br />                                 volume_size_in_gb=5)<br />sklearn_processor.run(<br />    code='../src/feature-engineering-pass1p5/preprocessing.py',<br />    job_name='-'.join(['scaled-processing', 'p1p5', ts]),<br />    arguments=['bucket', bucket,<br />               'pass1out_prefix', 'data_outputs',<br />               'pass1out_data', 'ws_df_group.json',<br />               'pass1p5out_data', 'waveform_summary.parquet',<br />               'statsdata_name', 'signal_stats.csv'],<br />    wait=True<br />)</pre> | 数据科学家、机器学习工程师 | 

## 相关资源
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-resources"></a>
+ [使用快速入门登录 Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html)（SageMaker 文档）
+ [流程数据](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html)（SageMaker 文档） 
+ [使用 scikit-learn 进行数据处理（文档](https://docs.aws.amazon.com/sagemaker/latest/dg/use-scikit-learn-processing-container.html)）SageMaker  
+ [joblib.Parallel 文档](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
+ Moody, B., Moody, G., Villarroel, M., Clifford, G. D., & Silva, I. (2020). [MIMIC-III 波形数据库](https://doi.org/10.13026/c2607m)（版本 1.0）。 *PhysioNet*。
+ Johnson, A. E. W., Pollard, T. J., Shen, L., Lehman, L. H., Feng, M., Ghassemi, M., Moody, B., Szolovits, P., Celi, L. A., & Mark, R. G. (2016). [MIMIC-III，可免费访问的重症监护数据库](https://dx.doi.org/10.1038/sdata.2016.35)。Scientific Data, 3, 160035.
+ [MIMIC-III Waveform Database 许可证](https://physionet.org/content/mimic3wdb/1.0/LICENSE.txt)

## 附件
<a name="attachments-e7a90b31-de8f-41fd-bb3f-c7c6100fc306"></a>

要访问与此文档相关联的其他内容，请解压以下文件：[attachment.zip](samples/p-attach/e7a90b31-de8f-41fd-bb3f-c7c6100fc306/attachments/attachment.zip)