

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Flink が高可用性とジョブの耐障害性をサポートする方法
<a name="jobruns-flink-resiliency"></a>

以下のセクションでは、Flink ジョブの信頼性を向上し、高可用性を実現する方法について説明します。これは、Flink の高可用性や障害発生時のさまざまな復旧機能などの組み込み機能によって実現されます。

**Topics**
+ [Flink 演算子と Flink アプリケーションでの高可用性 (HA、High Availability) の使用](jobruns-flink-using-ha.md)
+ [EKS 上の Amazon EMR によるタスクリカバリとスケーリング操作のための Flink ジョブの再起動時間の最適化](jobruns-flink-restart.md)
+ [EKS 上での Amazon EMR の Flink を用いたスポットインスタンスの適切な廃止](jobruns-flink-decommission.md)

# Flink 演算子と Flink アプリケーションでの高可用性 (HA、High Availability) の使用
<a name="jobruns-flink-using-ha"></a>

このトピックでは、高可用性を設定する方法を示し、いくつかの異なるユースケースでどのように機能するかについて説明します。これには、ジョブマネージャーを使用している場合や Flink ネイティブ kubernetes を使用している場合が含まれます。

## Flink 演算子の高可用性
<a name="jobruns-flink-ha-operator"></a>

Flink 演算子の高可用性を有効にすることで、障害発生時にスタンバイの Flink 演算子にフェイルオーバーして、演算子制御ループのダウンタイムを最小限に抑えることができます。高可用性はデフォルトで有効になっており、デフォルトの開始演算子レプリカ数は 2 です。`values.yaml` ファイルのレプリカフィールドは Helm チャート用に設定できます。

以下のフィールドがカスタマイズ可能です。
+ `replicas` (オプション、デフォルトは 2): この数を 1 より大きく設定すると、他のスタンバイ演算子が作成され、ジョブの復旧が早くなります。
+ `highAvailabilityEnabled` (オプション、デフォルトは true): HA を有効にするかどうかを制御します。このパラメータを true に指定すると、マルチ AZ 配置サポートが有効になり、正しい `flink-conf.yaml` パラメータが設定されます。

`values.yaml` ファイルに以下の設定を行うことで、演算子の HA を無効にできます。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**マルチ AZ 配置**

複数のアベイラビリティーゾーンに演算子ポッドを作成します。これはソフト制約であり、別の AZ に十分なリソースがない場合、演算子ポッドは同じ AZ にスケジュールされます。

**リーダーレプリカの決定**

 HA が有効になっている場合、レプリカはリースを使用してどの JM がリーダーかを判断し、リーダーの選定には K8s リースを使用します。リースを記述し、.Spec.Holder Identity フィールドを確認することで、現在のリーダーを判断できます

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 インタラクション**

**アクセス認証情報の設定**

S3 バケットにアクセスするための適切な IAM 権限を IRSA に設定していることを確認してください。

**S3 アプリケーションモードからジョブジャーを取得**

Flink 演算子は S3 からのアプリケーションジャーの取得もサポートしています。FlinkDeployment 仕様で jarURI の S3 ロケーションを指定するだけです。

この機能は、PyFlink スクリプトのような他のアーティファクトのダウンロードにも使用できます。結果として生じる Python スクリプトはパス `/opt/flink/usrlib/` の下にドロップされます。

以下の例は、PyFlink ジョブにこの機能を使用する方法を示しています。jarURI フィールドと args フィールドに注意してください。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 コネクタ**

Flink には 2 つの S3 コネクタ (下記参照) が同梱されています。以下のセクションでは、どのコネクタをどのような場合に使用するかについて説明します。

**チェックポイント: Presto S3 コネクタ**
+ S3 スキームを s3p:// に設定します。
+ S3 へのチェックポイントに使用することをお勧めするコネクタ。詳細については、「Apache Flink ドキュメント」の「[S3 固有](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)」を参照してください。

FlinkDeployment 仕様の例:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**S3 への読み取りと書き込み: Hadoop S3 コネクタ**
+ S3 スキームを `s3://` または (`s3a://`) に設定する
+ S3 からのファイルの読み書きに推奨されるコネクタ ([Flinks Filesystem インターフェイス](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)を実装する S3 コネクタのみ)。
+ デフォルトでは、`fs.s3a.aws.credentials.provider` が `flink-conf.yaml` ファイルに設定されます。このファイルは `com.amazonaws.auth.WebIdentityTokenCredentialsProvider` です。デフォルト `flink-conf` を完全にオーバーライドして S3 とやり取りする場合は、必ずこのプロバイダーを使用してください。

FlinkDeployment 仕様の例

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink ジョブマネージャー
<a name="jobruns-flink-ha-manager"></a>

Flink デプロイの高可用性 (HA、High Availability) により、一時的なエラーが発生して JobManager がクラッシュした場合でも、ジョブを継続して進行させることができます。ジョブは HA が有効になっている正常な最後のチェックポイントから再開されます。HA が有効になっていないと、Kubernetes は JobManager を再起動しますが、ジョブは新しいジョブとして開始され、進行状況は失われます。HA を設定したら、JobManager で一時的な障害が発生した場合に参照できるように HA メタデータを永続ストレージに保存し、正常な最後のチェックポイントからジョブを再開するように Kubernetes に指示できます。

Flink ジョブでは HA がデフォルトで有効になっています (レプリカ数は 2 に設定されているため、HA メタデータを保持するための S3 ストレージロケーションを指定する必要があります)。

**HA 設定**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下は、Job Manager (.spec.jobManager で定義されている) における上記の HA 設定の説明です。
+ `highAvailabilityEnabled` (オプション、デフォルトは true): HA を有効にせず、提供されている HA 設定を使用しない場合は、これを `false ` に設定してください。引き続き、「replicas」フィールドを操作して HA を手動で設定することはできます。
+ `replicas` (オプション、デフォルトは 2): この数を 1 より大きく設定すると、他のスタンバイ JobManagers が作成され、ジョブのリカバリが速くなります。HA を無効にする場合は、レプリカ数を 1 に設定する必要があります。そうしないと、検証エラーが発生し続けます (HA が有効になっていない場合、サポートされるレプリカは 1 つのみです)。
+ `storageDir` (必須): デフォルトではレプリカ数を 2 に設定しているため、永続的な storageDir を用意する必要があります。現在、このフィールドはストレージロケーションとして S3 パスのみを受け付けます。

**ポッドのローカリティ**

 HA を有効にすると、同じ AZ 内のポッドのコロケーションも試みられるため、パフォーマンスが向上します (同じ AZ にポッドを配置することでネットワークレイテンシーが減少します)。これはベストエフォート型のプロセスです。つまり、ポッドの大部分がスケジュールされている AZ に十分なリソースがない場合でも、残りのポッドは引き続きスケジュールされますが、最終的にはこの AZ 外のノードに配置される可能性があります。

**リーダーレプリカの決定**

HA が有効になっている場合、レプリカはリースを使用してどの JM がリーダーかを判断し、このメタデータを保存するデータストアとして K8s Configmap を使用します。リーダーを特定する場合は、Configmap の内容とデータ内のキー `org.apache.flink.k8s.leader.restserver` を確認し、IP アドレスを持つ K8s ポッドを見つけてください。以下の bash コマンドを使用することもできます。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink ジョブ – ネイティブ Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 以降では、Amazon EKS クラスター上で高可用性モードで Flink アプリケーションを実行できるよう、Flink ネイティブの Kubernetes をサポートしています。

**注記**  
Flink ジョブを送信する際に高可用性メタデータを保存するには、Amazon S3 バケットを作成する必要があります。この機能を使用しない場合には無効にできます。これは、デフォルトでは有効になっています。

Flink の高可用性機能を有効にするには、[`run-application` CLI コマンドを実行する](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)ときに次の Flink パラメータを指定します。パラメータは例の下に定義されています。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – ジョブの高可用性メタデータを保存するための Amazon S3 バケット。

  **`Dkubernetes.jobmanager.replicas`** – 作成するジョブマネージャーポッドの数を、`1` より大きい整数で指定します。

  **`Dkubernetes.cluster-id`** – Flink クラスターを識別する固有の ID です。

# EKS 上の Amazon EMR によるタスクリカバリとスケーリング操作のための Flink ジョブの再起動時間の最適化
<a name="jobruns-flink-restart"></a>

タスクが失敗したり、スケーリング操作が発生したりすると、Flink は最後に完了したチェックポイントからタスクを再実行しようとします。チェックポイントの状態のサイズと並列タスクの数によっては、再起動プロセスの実行に 1 分以上かかる場合があります。再起動中は、ジョブのバックログタスクが蓄積されることがあります。ただし、Flink が実行グラフの回復と再開の速度を最適化してジョブの安定性を高める方法はいくつかあります。

このページでは、Amazon EMR Flink がスポットインスタンスでタスクリカバリまたはスケーリング操作中のジョブの再起動時間を改善できるいくつかの方法について説明します。スポットインスタンスとは、割引価格で利用可能な未使用のコンピューティング能力のことです。これには、不定期に中断するなど独自の動作があるため、EKS 上の Amazon EMR が廃止やジョブの再起動を実行する方法を含めて、Amazon EMR on EKS がこれらをどのように処理するかを理解することが重要です。

**Topics**
+ [タスクローカルリカバリ](#flink-restart-task-local)
+ [Amazon EBS ボリュームマウントによるタスクローカルリカバリ](#flink-restart-task-local-ebs)
+ [汎用ログベースのインクリメンタルチェックポイント](#flink-restart-log-check)
+ [きめ細かなリカバリ](#flink-restart-fine-grained)
+ [アダプティブスケジューラーに組み込まれた再起動メカニズム](#flink-restart-combined)

## タスクローカルリカバリ
<a name="flink-restart-task-local"></a>

**注記**  
EKS 6.14.0 以降では、Amazon EMR 上の Flink によるタスクローカルリカバリをサポートしています。

Flink チェックポイントでは、各タスクが Flink が Amazon S3 などの分散ストレージに書き込む状態のスナップショットを作成します。復旧時には、タスクは分散ストレージから状態を復元します。分散ストレージはすべてのノードからアクセスできるため、耐障害性があり、再スケーリング中に状態を再分散できます。

ただし、リモート分散ストアには欠点もあります。すべてのタスクはネットワーク経由でリモートロケーションから状態を読み取る必要があります。そのため、タスクの回復やスケーリング操作中に大きな状態の回復時間が長くなる可能性があります。

リカバリ時間が長いというこの問題は、*タスクローカルリカバリ*によって解決できます。タスクはチェックポイントの状態を、ローカルディスクなど、タスクのローカルにあるセカンダリストレージに書き込みます。また、プライマリストレージ（この場合は Amazon S3）に状態が保存されます。復元中、スケジューラーは、タスクが以前に実行されていたのと同じタスクマネージャー上でタスクをスケジュールし、リモート状態ストアから読み取るのではなく、ローカル状態ストアから復元できるようにします。詳細については、「*Apache Flink ドキュメント*」の「[Apache Flink Documentation](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)」を参照してください。

サンプルジョブを使ったベンチマークテストでは、タスクローカルリカバリを有効にすると、リカバリ時間が数分から数秒に短縮されたことがわかりました。

タスクローカルリカバリを有効にするには、`flink-conf.yaml` ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Amazon EBS ボリュームマウントによるタスクローカルリカバリ
<a name="flink-restart-task-local-ebs"></a>

**注記**  
EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いた Amazon EBS でのタスクローカルリカバリをサポートしています。

EKS 上の Amazon EMR で Flink を使用すると、Amazon EBS ボリュームを TaskManager ポッドに自動的にプロビジョニングして、タスクローカルリカバリを行うことができます。デフォルトのオーバーレイマウントには 10 GB のボリュームが付属しており、状態の低いジョブには十分です。状態が大きいジョブでは、*EBS ボリュームの自動マウント*オプションを有効にできます。TaskManager ポッドはポッド作成時に自動的に作成およびマウントされ、ポッドの削除時に削除されます。

以下のステップを使用して、EKS 上の Amazon EMR 内の Flink 用に自動 EBS ボリュームマウントを有効にします。

1. 次のステップで使用する以下の変数の値をエクスポートします。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. クラスター用の `kubeconfig` YAML ファイルを作成または更新します。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. お使いの Amazon EKS クラスター上の Amazon EBS Container Storage Interface (CSI) ドライバー 用 IAM サービスアカウントを作成します。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 以下のコマンドで、Amazon EBS CSI ドライバーを作成します。

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 以下のコマンドで、Amazon EBS ストレージクラスを作成します。

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   次に、クラスを適用します。

   ```
   kubectl apply -f storage-class.yaml
   ```

1. サービスアカウントを作成するオプションを使用して、Amazon EMR Flink Kubernetes オペレーターを Helm インストールします。これにより、Flink デプロイで使用する `emr-containers-sa-flink` を作成します。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Flink ジョブを送信してタスクローカルリカバリ用 EBS ボリュームの自動プロビジョニングを有効にするには、お使いの `flink-conf.yaml` ファイルに以下の構成を設定します。ジョブの状態サイズに合わせてサイズ制限を調整します。`serviceAccount` を `emr-containers-sa-flink` に設定します。チェックポイント間隔の値をミリ秒単位で指定します。`executionRoleArn` は省略してください。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Amazon EBS CSI ドライバープラグインを削除する準備ができたら、以下のコマンドを使用します。

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 汎用ログベースのインクリメンタルチェックポイント
<a name="flink-restart-log-check"></a>

**注記**  
EKS 6.14.0 以降では、Amazon EMR 上の Flink による汎用ログベースのインクリメンタルチェックポイントをサポートしています。

チェックポイントの速度を向上させるため、汎用ログベースのインクリメンタルチェックポイントが Flink 1.16 に追加されました。チェックポイント間隔を短くすると、回復後に再処理する必要のあるイベントが少なくなるため、多くの場合回復作業が削減されます。詳細については、「*Apache Flink ブログ*」の「[Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)」を参照してください。

サンプルジョブを使用したベンチマークテストでは、汎用ログベースのインクリメンタルチェックポイントを使用すると、チェックポイントにかかる時間が数分から数秒に短縮されたことがわかりました。

汎用ログベースのインクリメンタルチェックポイントを有効にするには、`flink-conf.yaml` ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## きめ細かなリカバリ
<a name="flink-restart-fine-grained"></a>

**注記**  
EKS 6.14.0 以降では、Amazon EMR 上の Flink を用いたデフォルトスケジューラーでのきめ細かなリカバリをサポートしています。EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いたアダプティブスケジューラー内のきめ細かなリカバリをサポートしています。

実行中にタスクが失敗した場合、Flink は実行グラフ全体をリセットし、最後に完了したチェックポイントから完全な再実行をトリガーします。これは、失敗したタスクを単に再実行するよりもコストがかかります。きめ細かい復元では、失敗したタスクのパイプラインに接続されたコンポーネントのみを再起動します。次の例では、ジョブグラフには 5 つの頂点 (`A` から `E`) があります。頂点間のすべての接続はポイントごとの分散でパイプライン化され、ジョブの `parallelism.default` は `2` に設定されます。

```
A → B → C → D → E
```

この例では、合計 10 個のタスクが実行されています。最初のパイプライン (`a1` から `e1`) は TaskManager (`TM1`) で実行され、2 番目のパイプライン (`a2` から `e2`) は別の TaskManager (`TM2`) 上で実行されます。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

パイプライン接続されたコンポーネントには、`a1 → e1` と `a2 → e2` の 2 つがあります。`TM1` または `TM2` のどちらか一方で障害が発生しても、影響を受けるのは、その TaskManager が実行されていたパイプライン内の 5 つのタスクのみです。再起動戦略では、影響を受けるパイプラインコンポーネントのみを起動します。

きめ細かいリカバリは、完全に並列した Flink ジョブでのみ機能します。`keyBy()` または `redistribute()` オペレーションではサポートされていません。詳細については、「*Flink 改善提案*」Jira プロジェクトの「[FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)」を参照してください。

きめ細かい復元を有効にするには、`flink-conf.yaml` ファイルで次の設定を行います。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## アダプティブスケジューラーに組み込まれた再起動メカニズム
<a name="flink-restart-combined"></a>

**注記**  
EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いたアダプティブスケジューラー内の複合再起動メカニズムをサポートしています。

アダプティブスケジューラーは、使用可能なスロットに基づいてジョブの並列処理を調整できます。設定したジョブの並列処理を満たすだけの十分なスロットがない場合は、自動的に並列処理を減らします 新しいスロットが使用可能になると、ジョブは設定されたジョブの並列処理に合わせて再びスケールアップされます。適応型スケジューラーは、利用可能なリソースが十分になくなってもジョブのダウンタイムを回避します。これは Flink Autoscaler でサポートされているスケジューラーです。これらの理由から、Amazon EMR Flink を使用するアダプティブスケジューラーをお勧めします。ただし、アダプティブスケジューラーは、新しいリソースが追加されるたびに 1 回再起動するなど、短期間に複数の再起動を行う場合があります。これにより、ジョブのパフォーマンスが低下する可能性があります。

Amazon EMR 6.15.0 以降では、Flink のアダプティブスケジューラーに再起動メカニズムが組み合わされています。このメカニズムは、最初のリソースが追加されると再起動ウィンドウを開き、設定したウィンドウ間隔（デフォルトの 1 分）まで待機します。並列処理を設定してジョブを実行するのに十分なリソースがあるとき、または間隔がタイムアウトになったときに、1 回再起動します。

サンプルジョブを使用したベンチマークテストでは、アダプティブスケジューラーと Flink オートスケーラーを使用すると、この機能はデフォルトの動作よりも 10% 多くのレコードを処理することがわかりました。

複合再起動メカニズムを有効にするには、`flink-conf.yaml` ファイルで以下の設定を行います。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# EKS 上での Amazon EMR の Flink を用いたスポットインスタンスの適切な廃止
<a name="jobruns-flink-decommission"></a>

EKS 上の Amazon EMR で Flink を使用すると、タスクリカバリまたはスケーリング操作中のジョブの再起動時間を改善できます。

## 概要:
<a name="jobruns-flink-decommission-overview"></a>

EKS リリース 6.15.0 以降上の Amazon EMR では、EKS 上の Amazon EMR スポットインスタンスのタスクマネージャーの、Apache Flink を用いることによる適切な廃止をサポートしています。この機能の一部として、EKS 上の Flink を搭載した Amazon EMR には以下の機能が備わっています。
+ **ジャストインタイムチェックポイント** - Flink のストリーミングジョブは、スポットインスタンスの中断に対応したり、実行中のジョブのジャストインタイム (JIT) チェックポイントを実行したり、これらのスポットインスタンスで追加のタスクをスケジュールしないようにできます。JIT チェックポイントは、デフォルトおよびアダプティブスケジューラーでサポートされています。
+ **複合再起動メカニズム** - 複合再起動メカニズムは、ターゲットリソースの並列処理、または現在設定されているウィンドウの終わりに到達した場合に、ベストエフォートベースでジョブの再起動を試みます。これにより、複数のスポットインスタンス終了によりジョブが連続して再起動されるのを防ぐこともできます。複合再起動メカニズムはアダプティブスケジューラーでのみ使用できます。

これらの機能には次のメリットがあります。
+ スポットインスタンスを利用してタスクマネージャーを実行することで、クラスター費用を削減できます。
+ スポットインスタンスタスクマネージャーの活性が向上することで耐障害性が強化され、ジョブスケジューリング効率が向上します。
+ スポットインスタンス終了後の再起動回数が減るため、Flink ジョブのアップタイムが増えます。

## 適切な廃止の仕組み
<a name="jobruns-flink-decommission-howitworks"></a>

以下の例を考えてみましょう。Apache Flink を実行している EKS クラスターに Amazon EMR をプロビジョニングし、ジョブマネージャーにはオンデマンドノードを、タスクマネージャーにはスポットインスタンスノードを指定するとします。終了の 2 分前に、タスクマネージャーは中断通知を受け取ります。

このシナリオでは、ジョブマネージャーはスポットインスタンスの中断信号を処理し、スポットインスタンス上の追加タスクのスケジューリングをブロックし、ストリーミングジョブの JIT チェックポイントを開始します。

次にジョブマネージャーは、現在の再起動間隔ウィンドウで現在のジョブの並列処理を満たす新しいリソースが十分に利用可能になった場合にのみ、ジョブグラフを再起動します。再起動ウィンドウ間隔は、スポットインスタンスの交換期間、新しいタスクマネージャーポッドの作成、およびジョブマネージャーへの登録に基づいて決定されます。

## 前提条件
<a name="jobruns-flink-decommission-prereqs"></a>

適切な廃止を使用するには、Apache Flink を実行している EKS クラスター上の Amazon EMR にストリーミングジョブを作成して実行します。次の例のように、少なくとも 1 つのスポットインスタンスでスケジュールされたアダプティブスケジューラーとタスクマネージャーを有効にします。ジョブマネージャーにはオンデマンドノードを使用する必要があります。また、スポットインスタンスが少なくとも 1 つあれば、タスクマネージャーにもオンデマンドノードを使用できます。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## 設定
<a name="jobruns-flink-decommission-config"></a>

このセクションでは、廃止のニーズに合わせて指定できる大部分の構成について説明します。


| キー | 説明 | デフォルトの値 | 許容値 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  タスクマネージャーの適切な廃止を有効にします。  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  アダプティブスケジューラー内の複合、再起動メカニズムを有効にします。  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  ジョブのマージされた再起動を実行するための複合再起動ウィンドウ間隔。単位のない整数はミリ秒として解釈されます。  |  1m  |  例: 30、 60s、 3m、 1h | 