

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# SageMaker Processing を使用して、テラバイト規模の ML データセットの分散型特徴量エンジニアリングを実現
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets"></a>

*Amazon Web Services、Chris Boomhower*

## 概要
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-summary"></a>

テラバイト規模またはそれ以上のデータセットの多くは階層的なフォルダ構造で構成されており、データセット内のファイルは相互に依存している場合があります。このため、機械学習 (ML) エンジニアとデータサイエンティストは、モデルトレーニングや推論のためにデータを準備するために、慎重に設計上の決定を下さなければなりません。このパターンは、手動のマクロシャーディングとマイクロシャーディングの手法を Amazon SageMaker Processing と仮想 CPU (vCPU) 並列化と組み合わせて使用して、複雑なビッグデータ ML データセットの特徴量エンジニアリングプロセスを効率的にスケーリングする方法を示しています。 

このパターンでは、データディレクトリを複数のマシンに分割して処理することを「*マクロシャーディング*」と定義し、「*マイクロシャーディング*」は各マシンのデータを複数の処理スレッドに分割することと定義されています。このパターンでは、Amazon SageMaker と「[PhysioNet MIMIC-III](https://physionet.org/content/mimic3wdb/1.0/)」データセットのサンプル時系列波形レコードを使用して、これらの手法を実証しています。このパターンの手法を実装することで、リソース利用率とスループット効率を最大化しながら、特徴量エンジニアリングの処理時間とコストを最小限に抑えることができます。これらの最適化では、データタイプに関係なく、類似の大規模データセットで、Amazon Elastic Compute Cloud (Amazon EC2) インスタンスと vCPUs で、分散された SageMaker 処理を使用します。

## 前提条件と制限
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-prereqs"></a>

**前提条件**
+ 独自のデータセットにこのパターンを実装する場合は、SageMaker ノートブックインスタンスまたは SageMaker Studio にアクセスできます。Amazon SageMaker を初めて使用する場合は、AWS ドキュメントの「[Amazon SageMaker の入門](https://docs.aws.amazon.com/sagemaker/latest/dg/gs.html)」を参照してください。
+ SageMaker Studio、「[PhysioNet MIMIC-III](https://physionet.org/content/mimic3wdb/1.0/)」のサンプルデータを使用してこのパターンを実装したい場合。 
+ このパターンは SageMaker 処理を使用しますが、SageMaker 処理ジョブを実行した経験は必要ありません。

**制限事項**
+ このパターンは、相互に依存するファイルを含む ML データセットに非常に適しています。このような相互依存関係は、手動によるマクロシャーディングと、複数の単一インスタンスの SageMaker Processing ジョブをparallel 実行することから最もメリットがあります。そのような相互依存関係が存在しないデータセットの場合、SageMaker Processing の`ShardedByS3Key`機能は、同じ処理ジョブによって管理される複数のインスタンスにシャーディングされたデータを送信するため、マクロシャーディングのより良い代替となる可能性があります。ただし、このパターンのマイクロシャーディング戦略をどちらのシナリオでも実装して、インスタンス vCPUs を最大限に活用できます。

**製品バージョン**
+ Amazon SageMaker Python SDK バージョン 2

## アーキテクチャ
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-architecture"></a>

**ターゲットテクノロジースタック**
+ Amazon Simple Storage Service (Amazon S3)
+ Amazon SageMaker

**ターゲットアーキテクチャ**

*マクロシャーディングと分散型 EC2 インスタンス*

このアーキテクチャで表される 10 個のparallel プロセスは、MIMIC-III データセットの構造を反映しています。(図を簡略化するため、プロセスは楕円で示されています)。手動マクロシャーディングを使用する場合も、同様のアーキテクチャがどのデータセットにも適用されます。MIMIC-III の場合、各患者グループフォルダを最小の労力で個別に処理することで、データセットの未加工の構造を活用できます。以下の図では、レコードグループブロックが左側に表示されています (1)。データが分散されていることを考えると、患者グループごとにシャードするのは理にかなっています。

![\[マイクロシャーディングと分散 EC2 インスタンスのアーキテクチャ\]](http://docs.aws.amazon.com/ja_jp/prescriptive-guidance/latest/patterns/images/pattern-img/e7a90b31-de8f-41fd-bb3f-c7c6100fc306/images/c19a8f87-ac59-458e-89cb-50be17ca4a0c.png)


ただし、患者グループごとに手動でシャーディングを行うと、図 (2) の中央のセクションでわかるように、複数の EC2 インスタンスによる単一の処理ジョブではなく、患者グループフォルダごとに個別の処理ジョブが必要になります。MIMIC-III のデータには、バイナリ波形ファイルと対応するテキストベースのヘッダーファイルの両方が含まれており、バイナリデータ抽出には「[wfdb ライブラリ](https://wfdb.readthedocs.io/en/latest/)」に依存する必要があるため、特定の患者のすべてのレコードを同じインスタンスで利用できるようにする必要があります。各バイナリ波形ファイルの関連ヘッダーファイルも確実に存在させる唯一の方法は、各シャードを独自の処理ジョブ内で実行する手動シャーディングを実装し、処理ジョブの入力を定義するときに`s3_data_distribution_type='FullyReplicated'`を指定することです。あるいは、すべてのデータが単一のディレクトリにあり、ファイル間に依存関係がない場合、より適切なオプションは、複数の EC2 インスタンスと`s3_data_distribution_type='ShardedByS3Key'`を指定して単一の処理ジョブを起動することかもしれません。Amazon S3 データ分散タイプとして`ShardedByS3Key `を指定すると、SageMaker はインスタンス間のデータシャーディングを自動的に管理するようになります。 

複数のインスタンスを同時に実行すると時間を節約できるため、データを前処理するにはフォルダごとに処理ジョブを起動するのがコスト効率の高い方法です。コストと時間をさらに節約するために、各処理ジョブ内でマイクロシャーディングを使用することもできます。 

*マイクロシャーディングとparallel vCPUs*

各処理ジョブ内で、グループ化されたデータはさらに分割され、SageMaker のフルマネージド EC2 インスタンスで使用可能なすべての vCPUs を最大限に活用します。図の中央のセクション (2) のブロックは、各主要処理ジョブ内で何が起こるかを表しています。患者記録フォルダの内容は、インスタンスで使用可能なvCPUs の数に基づいてフラット化され、均等に分割されます。フォルダの内容が分割されると、同じサイズのファイルセットがすべての vCPUs に分散されて処理されます。処理が完了すると、各 vCPU の結果は、処理ジョブごとに 1 つのデータファイルにまとめられます。 

添付のコードでは、これらの概念が`src/feature-engineering-pass1/preprocessing.py`ファイルの次のセクションに示されています。

```
def chunks(lst, n):
    """
    Yield successive n-sized chunks from lst.
    
    :param lst: list of elements to be divided
    :param n: number of elements per chunk
    :type lst: list
    :type n: int
    :return: generator comprising evenly sized chunks
    :rtype: class 'generator'
    """
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
 
 
# Generate list of data files on machine
data_dir = input_dir
d_subs = next(os.walk(os.path.join(data_dir, '.')))[1]
file_list = []
for ds in d_subs:
    file_list.extend(os.listdir(os.path.join(data_dir, ds, '.')))
dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']
 
# Split list of files into sub-lists
cpu_count = multiprocessing.cpu_count()
splits = int(len(dat_list) / cpu_count)
if splits == 0: splits = 1
dat_chunks = list(chunks(dat_list, splits))
 
# Parallelize processing of sub-lists across CPUs
ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
 
# Compile and pickle patient group dataframe
ws_df_group = pd.concat(ws_df_list)
ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'})
ws_df_group.to_json(os.path.join(output_dir, group_data_out))
```

最初に関数`chunks`を定義し、与えられたリストを`n `の長さの均一な大きさのブロックに分割し、その結果をジェネレータとして返すことにより、与えられたリストを使用します。次に、存在するすべてのバイナリ波形ファイルのリストをコンパイルして、データを患者フォルダ全体でフラット化します。これが完了すると、EC2 インスタンスで使用可能な vCPUs の数が取得されます。バイナリ波形ファイルのリストは、`chunks`を呼び出すことによってこれらのvCPUに均等に分割され、その後、「[joblibのParallelクラス](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)」を使用することによって、各波形サブリストがそれぞれのvCPUで処理されます。処理ジョブによって結果は自動的に 1 つのデータフレームリストにまとめられ、SageMaker はさらに処理してからジョブ完了時に Amazon S3 に書き込まれます。この例では、処理ジョブによって Amazon S3 に書き込まれるファイルが 10 個あります (ジョブごとに 1 つ)。

最初の処理ジョブがすべて完了すると、図 (3) の右側のブロックに示されているセカンダリ処理ジョブが、各プライマリ処理ジョブによって生成された出力ファイルを結合し、結合された出力を Amazon S3 (4) に書き込みます。

## ツール
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-tools"></a>

**ツール**
+ [Python](https://www.python.org/) — このパターンに使用されるサンプルコードは Python (バージョン 3) です。
+ [SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio.html) – Amazon SageMaker Studio は、ウェブベースの機械学習用の統合開発環境 (IDE) です。この IDE を使うと、機械学習モデルを構築、トレーニング、デバッグ、デプロイ、モニタリングできます。SageMaker Processing ジョブは、SageMaker Studio 内の Jupyter Notebookを使用して実行します。
+ [SageMaker Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) — Amazon SageMaker Processingを使用すると、データ処理ワークロードを簡単に実行できます。このパターンでは、特徴量エンジニアリングコードは SageMaker Processing ジョブを使用して大規模に実装されます。

**Code**

添付の.zip ファイルには、このパターンの完全なコードが記載されています。次のセクションでは、このパターンのアーキテクチャを構築する手順について説明します。各ステップは、添付ファイルのサンプルコードで説明されています。

## エピック
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-epics"></a>

### SageMaker Studio 環境をセットアップする
<a name="set-up-your-sagemaker-studio-environment"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| Amazon SageMaker Studio へアクセスします。 | 「[Amazon SageMaker ドキュメント](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html)」に記載されている指示に従って、AWS アカウントの SageMaker Studio にオンボーディングします。 | データサイエンティスト、ML エンジニア | 
| wget ユーティリティをインストールします。 | 新しい SageMaker Studio 構成をオンボーディングした場合や、SageMaker Studio でこれらのユーティリティをこれまで使用したことがない場合は、「*wget*」をインストールしてください。 インストールするには、SageMaker Studio コンソールでターミナルウィンドウを開き、以下のコマンドを実行します。<pre>sudo yum install wget</pre> | データサイエンティスト、ML エンジニア | 
| サンプルコードをダウンロードして解凍します。 | 「*添付ファイル*」セクションから`attachments.zip`ファイルをダウンロードします。ターミナルウィンドウで、ファイルをダウンロードしたフォルダに移動し、その内容を抽出します。<pre>unzip attachment.zip</pre>`Scaled-Processing.zip`ファイルを抽出したフォルダに移動し、ファイルの内容を抽出します。<pre>unzip Scaled-Processing.zip</pre> | データサイエンティスト、ML エンジニア | 
| physionet.org からサンプルデータセットをダウンロードし、Amazon S3 にアップロードします。 | `Scaled-Processing`ファイルが含まれているフォルダ内で `get_data.ipynb` Jupyter Notebookを実行します。このノートブックは「[physionet.org](https://physionet.org)」からサンプル MIMIC-III データセットをダウンロードし、Amazon S3 の SageMaker Studio セッションバケットにアップロードします。 | データサイエンティスト、ML エンジニア | 

### 1 つ目の前処理スクリプトを設定します。
<a name="configure-the-first-preprocessing-script"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| すべてのサブディレクトリにわたってファイル階層をフラット化する。 | MIMIC-III のような大規模なデータセットでは、論理的な親グループ内であってもファイルが複数のサブディレクトリに分散されることがよくあります。次のコードが示すように、スクリプトはすべてのサブディレクトリにあるすべてのグループファイルをフラット化するように設定する必要があります。<pre># Generate list of .dat files on machine<br />data_dir = input_dir<br />d_subs = next(os.walk(os.path.join(data_dir, '.')))[1]<br />file_list = []<br />for ds in d_subs:<br />    file_list.extend(os.listdir(os.path.join(data_dir, ds, '.')))<br />dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']</pre>    このエピックのサンプルコードスニペットは、添付ファイルにある `src/feature-engineering-pass1/preprocessing.py` ファイルからのものです。 | データサイエンティスト、ML エンジニア | 
| vCPU 数に基づいてファイルをサブグループに分割します。 | ファイルは、スクリプトを実行するインスタンスに存在するvCPUs の数に応じて、同じサイズのサブグループまたはチャンクに分割する必要があります。このステップでは、次のようなコードを実装できます。<pre># Split list of files into sub-lists<br />cpu_count = multiprocessing.cpu_count()<br />splits = int(len(dat_list) / cpu_count)<br />if splits == 0: splits = 1<br />dat_chunks = list(chunks(dat_list, splits))</pre> | データサイエンティスト、ML エンジニア | 
| vCPUs 間のサブグループの処理を並列化します。 | スクリプトロジックは、すべてのサブグループを並行して処理するように設定する必要があります。そのためには、Joblib ライブラリの`Parallel `クラスと`delayed `メソッドを次のように使用します。 <pre># Parallelize processing of sub-lists across CPUs<br />ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)</pre> | データサイエンティスト、ML エンジニア | 
| Amazon S3 に単一ファイルグループの出力を保存します。 | 並列 vCPU 処理が完了したら、各 vCPU の結果を組み合わせて、ファイルグループの S3 バケットパスにアップロードする必要があります。このステップでは、次のようなコードを実行できます。<pre># Compile and pickle patient group dataframe<br />ws_df_group = pd.concat(ws_df_list)<br />ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'})<br />ws_df_group.to_json(os.path.join(output_dir, group_data_out))</pre> | データサイエンティスト、ML エンジニア | 

### 2 つ目の前処理スクリプトを設定します。
<a name="configure-the-second-preprocessing-script"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| 最初のスクリプトを実行したすべての処理ジョブで生成されたデータファイルを結合します。 | 前述のスクリプトは、データセットのファイルグループを処理する SageMaker Processing ジョブごとに 1 つのファイルを出力します。 次に、これらの出力ファイルを 1 つのオブジェクトに結合し、1 つの出力データセットを Amazon S3 に書き込む必要があります。これは、添付ファイルにある`src/feature-engineering-pass1p5/preprocessing.py`ファイルに次のように示されています。<pre>def write_parquet(wavs_df, path):<br />    """<br />    Write waveform summary dataframe to S3 in parquet format.<br />    <br />    :param wavs_df: waveform summary dataframe<br />    :param path: S3 directory prefix<br />    :type wavs_df: pandas dataframe<br />    :type path: str<br />    :return: None<br />    """<br />    extra_args = {"ServerSideEncryption": "aws:kms"}<br />    wr.s3.to_parquet(<br />        df=wavs_df,<br />        path=path,<br />        compression='snappy',<br />        s3_additional_kwargs=extra_args)<br /> <br /> <br />def combine_data():<br />    """<br />    Get combined data and write to parquet.<br />    <br />    :return: waveform summary dataframe<br />    :rtype: pandas dataframe<br />    """<br />    wavs_df = get_data()<br />    wavs_df = normalize_signal_names(wavs_df)<br />    write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))<br /> <br />    return wavs_df<br /> <br /> <br />wavs_df = combine_data()</pre> | データサイエンティスト、ML エンジニア | 

### 処理ジョブの実行
<a name="run-processing-jobs"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| 最初の処理ジョブを実行します。 | マクロシャーディングを実行するには、ファイルグループごとに個別の処理ジョブを実行します。マイクロシャーディングは各処理ジョブ内で実行されます。これは、各ジョブで最初のスクリプトが実行されるためです。次のコードは、次のスニペット (`notebooks/FeatExtract_Pass1.ipynb`に含まれている) 内の各ファイルグループディレクトリの処理ジョブを起動する方法を示しています。<pre>pat_groups = list(range(30,40))<br />ts = str(int(time.time()))<br /> <br />for group in pat_groups:<br />    sklearn_processor = SKLearnProcessor(framework_version='0.20.0',<br />                                     role=role,<br />                                     instance_type='ml.m5.4xlarge',<br />                                     instance_count=1,<br />                                     volume_size_in_gb=5)<br />    sklearn_processor.run(<br />        code='../src/feature-engineering-pass1/preprocessing.py',<br />        job_name='-'.join(['scaled-processing-p1', str(group), ts]),<br />        arguments=[<br />            "input_path", "/opt/ml/processing/input",<br />            "output_path", "/opt/ml/processing/output",<br />            "group_data_out", "ws_df_group.json"<br />        ],<br />        inputs=<br />        [<br />            ProcessingInput(<br />                source=f's3://{sess.default_bucket()}/data_inputs/{group}',<br />                destination='/opt/ml/processing/input',<br />                s3_data_distribution_type='FullyReplicated'<br />            )<br />        ],<br />        outputs=<br />        [<br />            ProcessingOutput(<br />                source='/opt/ml/processing/output',<br />                destination=f's3://{sess.default_bucket()}/data_outputs/{group}'<br />            )<br />        ],<br />        wait=False<br />    )</pre> | データサイエンティスト、ML エンジニア | 
| 2 つ目の処理ジョブを実行します。 | 最初の処理ジョブセットで生成された出力を結合し、前処理のために追加の計算を実行するには、1 つの SageMaker 処理ジョブを使用して 2 番目のスクリプトを実行します。次のコードはこれを示しています (`notebooks/FeatExtract_Pass1p5.ipynb`に含まれている)。<pre>ts = str(int(time.time()))<br />bucket = sess.default_bucket()<br />     <br />sklearn_processor = SKLearnProcessor(framework_version='0.20.0',<br />                                 role=role,<br />                                 instance_type='ml.t3.2xlarge',<br />                                 instance_count=1,<br />                                 volume_size_in_gb=5)<br />sklearn_processor.run(<br />    code='../src/feature-engineering-pass1p5/preprocessing.py',<br />    job_name='-'.join(['scaled-processing', 'p1p5', ts]),<br />    arguments=['bucket', bucket,<br />               'pass1out_prefix', 'data_outputs',<br />               'pass1out_data', 'ws_df_group.json',<br />               'pass1p5out_data', 'waveform_summary.parquet',<br />               'statsdata_name', 'signal_stats.csv'],<br />    wait=True<br />)</pre> | データサイエンティスト、ML エンジニア | 

## 関連リソース
<a name="use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets-resources"></a>
+ [クイックスタートを使用して Amazon SageMaker Studio にオンボーディングする](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html) (SageMaker ドキュメント)
+ [プロセスデータ](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) (SageMaker ドキュメンテーション) 
+ [scikit-learn を使ってデータを処理する](https://docs.aws.amazon.com/sagemaker/latest/dg/use-scikit-learn-processing-container.html) (SageMaker ドキュメンテーション) 
+ [JobLib. パラレルドキュメンテーション](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
+ ムーディ、B.、ムーディ、G.、ビジャロエル、M.、クリフォード、G.D.、シルバ、I. (2020)。[MIMIC-III](https://doi.org/10.13026/c2607m) 波形データベース (バージョン 1.0)。*フィジオネット*。
+ ジョンソン、A. E. W.、ポラード、T.J.、シェン、L.、リーマン、L.H.、フェン、M.、ガセミ、M.、ムーディ、B.、ゾロビッツ、P.、セリ、L.A.、マーク、R.G.（2016）。「[MIMIC-III](https://dx.doi.org/10.1038/sdata.2016.35)」は、無料でアクセスできる救命救急データベースです。科学データ、3、160035。
+ [MIMIC-III 波形データベースライセンス](https://physionet.org/content/mimic3wdb/1.0/LICENSE.txt)

## アタッチメント
<a name="attachments-e7a90b31-de8f-41fd-bb3f-c7c6100fc306"></a>

このドキュメントに関連する追加コンテンツにアクセスするには、次のファイルを解凍してください。「[attachment.zip](samples/p-attach/e7a90b31-de8f-41fd-bb3f-c7c6100fc306/attachments/attachment.zip)」