

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# EKS 上の Amazon EMR によるタスクリカバリとスケーリング操作のための Flink ジョブの再起動時間の最適化
<a name="jobruns-flink-restart"></a>

タスクが失敗したり、スケーリング操作が発生したりすると、Flink は最後に完了したチェックポイントからタスクを再実行しようとします。チェックポイントの状態のサイズと並列タスクの数によっては、再起動プロセスの実行に 1 分以上かかる場合があります。再起動中は、ジョブのバックログタスクが蓄積されることがあります。ただし、Flink が実行グラフの回復と再開の速度を最適化してジョブの安定性を高める方法はいくつかあります。

このページでは、Amazon EMR Flink がスポットインスタンスでタスクリカバリまたはスケーリング操作中のジョブの再起動時間を改善できるいくつかの方法について説明します。スポットインスタンスとは、割引価格で利用可能な未使用のコンピューティング能力のことです。これには、不定期に中断するなど独自の動作があるため、EKS 上の Amazon EMR が廃止やジョブの再起動を実行する方法を含めて、Amazon EMR on EKS がこれらをどのように処理するかを理解することが重要です。

**Topics**
+ [タスクローカルリカバリ](#flink-restart-task-local)
+ [Amazon EBS ボリュームマウントによるタスクローカルリカバリ](#flink-restart-task-local-ebs)
+ [汎用ログベースのインクリメンタルチェックポイント](#flink-restart-log-check)
+ [きめ細かなリカバリ](#flink-restart-fine-grained)
+ [アダプティブスケジューラーに組み込まれた再起動メカニズム](#flink-restart-combined)

## タスクローカルリカバリ
<a name="flink-restart-task-local"></a>

**注記**  
EKS 6.14.0 以降では、Amazon EMR 上の Flink によるタスクローカルリカバリをサポートしています。

Flink チェックポイントでは、各タスクが Flink が Amazon S3 などの分散ストレージに書き込む状態のスナップショットを作成します。復旧時には、タスクは分散ストレージから状態を復元します。分散ストレージはすべてのノードからアクセスできるため、耐障害性があり、再スケーリング中に状態を再分散できます。

ただし、リモート分散ストアには欠点もあります。すべてのタスクはネットワーク経由でリモートロケーションから状態を読み取る必要があります。そのため、タスクの回復やスケーリング操作中に大きな状態の回復時間が長くなる可能性があります。

リカバリ時間が長いというこの問題は、*タスクローカルリカバリ*によって解決できます。タスクはチェックポイントの状態を、ローカルディスクなど、タスクのローカルにあるセカンダリストレージに書き込みます。また、プライマリストレージ（この場合は Amazon S3）に状態が保存されます。復元中、スケジューラーは、タスクが以前に実行されていたのと同じタスクマネージャー上でタスクをスケジュールし、リモート状態ストアから読み取るのではなく、ローカル状態ストアから復元できるようにします。詳細については、「*Apache Flink ドキュメント*」の「[Apache Flink Documentation](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)」を参照してください。

サンプルジョブを使ったベンチマークテストでは、タスクローカルリカバリを有効にすると、リカバリ時間が数分から数秒に短縮されたことがわかりました。

タスクローカルリカバリを有効にするには、`flink-conf.yaml` ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Amazon EBS ボリュームマウントによるタスクローカルリカバリ
<a name="flink-restart-task-local-ebs"></a>

**注記**  
EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いた Amazon EBS でのタスクローカルリカバリをサポートしています。

EKS 上の Amazon EMR で Flink を使用すると、Amazon EBS ボリュームを TaskManager ポッドに自動的にプロビジョニングして、タスクローカルリカバリを行うことができます。デフォルトのオーバーレイマウントには 10 GB のボリュームが付属しており、状態の低いジョブには十分です。状態が大きいジョブでは、*EBS ボリュームの自動マウント*オプションを有効にできます。TaskManager ポッドはポッド作成時に自動的に作成およびマウントされ、ポッドの削除時に削除されます。

以下のステップを使用して、EKS 上の Amazon EMR 内の Flink 用に自動 EBS ボリュームマウントを有効にします。

1. 次のステップで使用する以下の変数の値をエクスポートします。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. クラスター用の `kubeconfig` YAML ファイルを作成または更新します。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. お使いの Amazon EKS クラスター上の Amazon EBS Container Storage Interface (CSI) ドライバー 用 IAM サービスアカウントを作成します。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 以下のコマンドで、Amazon EBS CSI ドライバーを作成します。

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 以下のコマンドで、Amazon EBS ストレージクラスを作成します。

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   次に、クラスを適用します。

   ```
   kubectl apply -f storage-class.yaml
   ```

1. サービスアカウントを作成するオプションを使用して、Amazon EMR Flink Kubernetes オペレーターを Helm インストールします。これにより、Flink デプロイで使用する `emr-containers-sa-flink` を作成します。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Flink ジョブを送信してタスクローカルリカバリ用 EBS ボリュームの自動プロビジョニングを有効にするには、お使いの `flink-conf.yaml` ファイルに以下の構成を設定します。ジョブの状態サイズに合わせてサイズ制限を調整します。`serviceAccount` を `emr-containers-sa-flink` に設定します。チェックポイント間隔の値をミリ秒単位で指定します。`executionRoleArn` は省略してください。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Amazon EBS CSI ドライバープラグインを削除する準備ができたら、以下のコマンドを使用します。

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 汎用ログベースのインクリメンタルチェックポイント
<a name="flink-restart-log-check"></a>

**注記**  
EKS 6.14.0 以降では、Amazon EMR 上の Flink による汎用ログベースのインクリメンタルチェックポイントをサポートしています。

チェックポイントの速度を向上させるため、汎用ログベースのインクリメンタルチェックポイントが Flink 1.16 に追加されました。チェックポイント間隔を短くすると、回復後に再処理する必要のあるイベントが少なくなるため、多くの場合回復作業が削減されます。詳細については、「*Apache Flink ブログ*」の「[Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)」を参照してください。

サンプルジョブを使用したベンチマークテストでは、汎用ログベースのインクリメンタルチェックポイントを使用すると、チェックポイントにかかる時間が数分から数秒に短縮されたことがわかりました。

汎用ログベースのインクリメンタルチェックポイントを有効にするには、`flink-conf.yaml` ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## きめ細かなリカバリ
<a name="flink-restart-fine-grained"></a>

**注記**  
EKS 6.14.0 以降では、Amazon EMR 上の Flink を用いたデフォルトスケジューラーでのきめ細かなリカバリをサポートしています。EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いたアダプティブスケジューラー内のきめ細かなリカバリをサポートしています。

実行中にタスクが失敗した場合、Flink は実行グラフ全体をリセットし、最後に完了したチェックポイントから完全な再実行をトリガーします。これは、失敗したタスクを単に再実行するよりもコストがかかります。きめ細かい復元では、失敗したタスクのパイプラインに接続されたコンポーネントのみを再起動します。次の例では、ジョブグラフには 5 つの頂点 (`A` から `E`) があります。頂点間のすべての接続はポイントごとの分散でパイプライン化され、ジョブの `parallelism.default` は `2` に設定されます。

```
A → B → C → D → E
```

この例では、合計 10 個のタスクが実行されています。最初のパイプライン (`a1` から `e1`) は TaskManager (`TM1`) で実行され、2 番目のパイプライン (`a2` から `e2`) は別の TaskManager (`TM2`) 上で実行されます。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

パイプライン接続されたコンポーネントには、`a1 → e1` と `a2 → e2` の 2 つがあります。`TM1` または `TM2` のどちらか一方で障害が発生しても、影響を受けるのは、その TaskManager が実行されていたパイプライン内の 5 つのタスクのみです。再起動戦略では、影響を受けるパイプラインコンポーネントのみを起動します。

きめ細かいリカバリは、完全に並列した Flink ジョブでのみ機能します。`keyBy()` または `redistribute()` オペレーションではサポートされていません。詳細については、「*Flink 改善提案*」Jira プロジェクトの「[FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)」を参照してください。

きめ細かい復元を有効にするには、`flink-conf.yaml` ファイルで次の設定を行います。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## アダプティブスケジューラーに組み込まれた再起動メカニズム
<a name="flink-restart-combined"></a>

**注記**  
EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いたアダプティブスケジューラー内の複合再起動メカニズムをサポートしています。

アダプティブスケジューラーは、使用可能なスロットに基づいてジョブの並列処理を調整できます。設定したジョブの並列処理を満たすだけの十分なスロットがない場合は、自動的に並列処理を減らします 新しいスロットが使用可能になると、ジョブは設定されたジョブの並列処理に合わせて再びスケールアップされます。適応型スケジューラーは、利用可能なリソースが十分になくなってもジョブのダウンタイムを回避します。これは Flink Autoscaler でサポートされているスケジューラーです。これらの理由から、Amazon EMR Flink を使用するアダプティブスケジューラーをお勧めします。ただし、アダプティブスケジューラーは、新しいリソースが追加されるたびに 1 回再起動するなど、短期間に複数の再起動を行う場合があります。これにより、ジョブのパフォーマンスが低下する可能性があります。

Amazon EMR 6.15.0 以降では、Flink のアダプティブスケジューラーに再起動メカニズムが組み合わされています。このメカニズムは、最初のリソースが追加されると再起動ウィンドウを開き、設定したウィンドウ間隔（デフォルトの 1 分）まで待機します。並列処理を設定してジョブを実行するのに十分なリソースがあるとき、または間隔がタイムアウトになったときに、1 回再起動します。

サンプルジョブを使用したベンチマークテストでは、アダプティブスケジューラーと Flink オートスケーラーを使用すると、この機能はデフォルトの動作よりも 10% 多くのレコードを処理することがわかりました。

複合再起動メカニズムを有効にするには、`flink-conf.yaml` ファイルで以下の設定を行います。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```