

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# TensorFlow 훈련 스크립트 수정
<a name="model-parallel-customize-training-script-tf"></a>

이 섹션에서는 TensorFlow 훈련 스크립트를 수정하여 자동 파티셔닝과 수동 파티셔닝을 위한 SageMaker 모델 병렬화 라이브러리를 구성하는 방법을 알아봅니다. 이 예제 모음에는 하이브리드 모델 및 데이터 병렬화를 위해 Horovod와 통합된 예제도 포함되어 있습니다.

**참고**  
라이브러리에서 지원하는 TensorFlow 버전을 찾으려면 [지원되는 프레임워크 및 AWS 리전](distributed-model-parallel-support.md)을/를 참조하세요.

라이브러리를 사용하기 위해 훈련 스크립트를 수정해야 하는 필수 사항은 [TensorFlow를 사용한 자동 분할](#model-parallel-customize-training-script-tf-23)에 나열되어 있습니다.

Horovod에서 하이브리드 모델 및 데이터 병렬화를 사용하도록 훈련 스크립트를 수정하는 방법을 알아보려면 [하이브리드 모델 및 데이터 병렬화를 위한 TensorFlow 및 Horovod를 사용한 자동 분할](#model-parallel-customize-training-script-tf-2.3)을/를 참조하세요.

수동 파티셔닝을 사용하려는 경우에도 [TensorFlow를 사용한 수동 분할](#model-parallel-customize-training-script-tf-manual)을/를 검토하세요.

다음 주제에서는 TensorFlow 모델의 자동 파티셔닝과 수동 파티셔닝을 위한 SageMaker의 모델 병렬화 라이브러리를 구성하는 데 사용할 수 있는 훈련 스크립트의 예를 보여줍니다.

**참고**  
자동 파티셔닝은 기본적으로 활성화되어 있습니다. 달리 지정하지 않는 한, 예제 스크립트는 자동 파티셔닝을 사용합니다.

**Topics**
+ [TensorFlow를 사용한 자동 분할](#model-parallel-customize-training-script-tf-23)
+ [하이브리드 모델 및 데이터 병렬화를 위한 TensorFlow 및 Horovod를 사용한 자동 분할](#model-parallel-customize-training-script-tf-2.3)
+ [TensorFlow를 사용한 수동 분할](#model-parallel-customize-training-script-tf-manual)
+ [지원되지 않는 프레임워크 기능](#model-parallel-tf-unsupported-features)

## TensorFlow를 사용한 자동 분할
<a name="model-parallel-customize-training-script-tf-23"></a>

SageMaker의 모델 병렬화 라이브러리로 TensorFlow 모델을 실행하려면 다음과 같은 훈련 스크립트 변경이 필요합니다.

1. [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init)을 사용하여 라이브러리를 가져오고 초기화합니다.

1. Keras 모델 클래스 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html)에서 상속하여 Keras 모델을 정의합니다. `smp.DistributedModel` 객체의 호출 메서드에서 모델 출력을 반환합니다. 호출 메서드에서 반환되는 모든 텐서는 모델 병렬 장치 간에 브로드캐스트되므로 통신 오버헤드가 발생하므로 호출 메서드 외부에서 필요하지 않은 텐서(예: 중간 활성화)는 반환되지 않아야 합니다.

1. `tf.Dataset.batch()` 메서드에서 `drop_remainder=True`로 설정합니다. 이는 배치 크기를 항상 마이크로배치 수로 나눌 수 있도록 하기 위한 것입니다.

1. `smp.dp_rank()`를 사용하여 데이터 파이프라인에서 무작위 작업을 시드(Seed) 하세요. 예를 들어 서로 다른 모델 파티션을 포함하는 GPU 간의 데이터 샘플의 일관성을 보장하기 위한 `shuffle(ds, seed=smp.dp_rank())`를 시드합니다.

1. 순방향 및 역방향 로직을 Step Function에 넣고 이를 `smp.step`로 데코레이트하세요.

1. `reduce_mean`과 같은 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 메서드를 사용하여 마이크로배치의 출력값에 대해 후처리를 수행합니다. [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 함수는 `smp.DistributedModel`의 출력값에 따라 달라지는 반환 값이 있어야 합니다.

1. 평가 단계가 있는 경우에도 마찬가지로 `smp.step`로 데코레이팅된 함수 안에 순방향 로직을 배치하고 [`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput)를 사용하여 출력값을 후처리하세요.

SageMaker의 모델 병렬화 라이브러리 API에 대한 자세한 내용은 [API 설명서](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)를 참조하세요.

다음 Python 스크립트는 변경 후의 훈련 스크립트의 예입니다.

```
import tensorflow as tf

# smdistributed: Import TF2.x API
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return the model output

model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

훈련 스크립트 준비를 마쳤으면 [2단계: SageMaker Python SDK를 사용하여 훈련 작업 시작](model-parallel-sm-sdk.md)으로 진행하세요. 하이브리드 모델 및 데이터 병렬화 훈련 작업을 실행하려면 다음 섹션으로 계속합니다.

## 하이브리드 모델 및 데이터 병렬화를 위한 TensorFlow 및 Horovod를 사용한 자동 분할
<a name="model-parallel-customize-training-script-tf-2.3"></a>

하이브리드 모델 및 데이터 병렬화를 위해 SageMaker 모델 병렬화 라이브러리를 Horovod와 함께 사용할 수 있습니다. 라이브러리가 하이브리드 병렬화를 위해 모델을 분할하는 방법에 대한 자세한 내용은 [파이프라인 병렬화(PyTorch 및 TensorFlow에서 사용 가능)](model-parallel-intro.md#model-parallel-intro-pp)을/를 참조하세요.

이 단계에서는 SageMaker 모델 병렬화 라이브러리에 맞게 훈련 스크립트를 수정하는 방법을 중점적으로 다룹니다.

[2단계: SageMaker Python SDK를 사용하여 훈련 작업 시작](model-parallel-sm-sdk.md)에서 설정할 하이브리드 병렬화 구성을 선택하도록 훈련 스크립트를 올바르게 설정하려면 데이터 병렬 순위와 모델 병렬 순위를 각각 자동으로 감지하는 라이브러리의 도우미 함수 `smp.dp_rank()` 및 `smp.mp_rank()`를 사용하세요.

라이브러리가 지원하는 모든 MPI 프리미티브를 찾으려면 SageMaker Python SDK 설명서에서 [MPI 기본](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#mpi-basics)을 참조하세요.

스크립트에 필요한 변경 사항은 다음과 같습니다.
+ `hvd.allreduce` 추가
+ Horovod에서 요구하는 대로 첫 번째 배치 이후에 변수를 브로드캐스팅합니다
+ `smp.dp_rank()`를 사용하여 데이터 파이프라인의 셔플링 및/또는 샤딩 작업을 시드(seed) 합니다.

**참고**  
Horovod를 사용할 때는 훈련 스크립트에서 `hvd.init`을 직접 호출해서는 안 됩니다. 대신 [2단계: SageMaker Python SDK를 사용하여 훈련 작업 시작](model-parallel-sm-sdk.md)의 SageMaker Python SDK `modelparallel` 파라미터에서 `"horovod"`을 `True`로 설정해야 합니다. 이렇게 하면 라이브러리가 모델 파티션의 디바이스 할당을 기반으로 Horovod를 내부적으로 초기화할 수 있습니다. 훈련 스크립트에서 `hvd.init()`을 직접 호출하면 문제가 발생할 수 있습니다.

**참고**  
훈련 스크립트에서 직접 `hvd.DistributedOptimizer` API를 사용하면 API가 `AllReduce` 작업을 암시적으로 `smp.step` 내부에 배치하기 때문에 훈련 성능과 속도가 저하될 수 있습니다. 다음 예와 같이 `smp.step`에서 반환된 그래디언트에 따라 `accumulate()` 또는 `reduce_mean()`을 호출한 후 `hvd.allreduce`를 직접 호출하여 Horovod와 함께 모델 병렬화 라이브러리를 사용하는 것이 좋습니다.

SageMaker의 모델 병렬화 라이브러리 API에 대한 자세한 내용은 [API 설명서](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)를 참조하세요.

```
import tensorflow as tf
import horovod.tensorflow as hvd

# smdistributed: Import TF2.x API 
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return model outputs


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels, first_batch):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    # Horovod: AllReduce the accumulated gradients
    gradients = [hvd.allreduce(g.accumulate()) for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Horovod: Broadcast the variables after first batch 
    if first_batch:
        hvd.broadcast_variables(model.variables, root_rank=0)
        hvd.broadcast_variables(optimizer.variables(), root_rank=0)

    # smdistributed: Merge predictions across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()

    for batch, (images, labels) in enumerate(train_ds):
        loss = train_step(images, labels, tf.constant(batch == 0))
```

## TensorFlow를 사용한 수동 분할
<a name="model-parallel-customize-training-script-tf-manual"></a>

`smp.partition` 컨텍스트 관리자를 사용하여 작업을 특정 파티션에 배치합니다. `smp.partition` 컨텍스트에 배치되지 않은 모든 작업은 `default_partition`에 배치됩니다. SageMaker의 모델 병렬화 라이브러리 API에 대한 자세한 내용은 [API 설명서](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)를 참조하세요.

```
import tensorflow as tf

# smdistributed: Import TF2.x API.
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches.
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API.
class MyModel(smp.DistributedModel):
    def __init__(self):
         # define layers

    def call(self, x):
        with smp.partition(0):
            x = self.layer0(x)
        with smp.partition(1):
            return self.layer1(x)


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

## 지원되지 않는 프레임워크 기능
<a name="model-parallel-tf-unsupported-features"></a>

다음 TensorFlow 기능은 라이브러리에서 지원되지 않습니다.
+ 현재 `tf.GradientTape()`은 지원되지 않습니다. 대신 `Optimizer.get_gradients()` 또는 `Optimizer.compute_gradients()`를 사용하여 그래디언트를 계산할 수 있습니다.
+ 현재 `tf.train.Checkpoint.restore()` API는 지원되지 않습니다. 체크포인팅의 경우 대신 `smp.CheckpointManager`를 사용합니다. 이는 동일한 API와 기능을 제공합니다. `smp.CheckpointManager`을 이용한 체크포인트 복원은 첫 번째 단계 이후에 이루어져야 한다는 점에 유의하세요.