

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Flink 演算子と Flink アプリケーションでの高可用性 (HA、High Availability) の使用
<a name="jobruns-flink-using-ha"></a>

このトピックでは、高可用性を設定する方法を示し、いくつかの異なるユースケースでどのように機能するかについて説明します。これには、ジョブマネージャーを使用している場合や Flink ネイティブ kubernetes を使用している場合が含まれます。

## Flink 演算子の高可用性
<a name="jobruns-flink-ha-operator"></a>

Flink 演算子の高可用性を有効にすることで、障害発生時にスタンバイの Flink 演算子にフェイルオーバーして、演算子制御ループのダウンタイムを最小限に抑えることができます。高可用性はデフォルトで有効になっており、デフォルトの開始演算子レプリカ数は 2 です。`values.yaml` ファイルのレプリカフィールドは Helm チャート用に設定できます。

以下のフィールドがカスタマイズ可能です。
+ `replicas` (オプション、デフォルトは 2): この数を 1 より大きく設定すると、他のスタンバイ演算子が作成され、ジョブの復旧が早くなります。
+ `highAvailabilityEnabled` (オプション、デフォルトは true): HA を有効にするかどうかを制御します。このパラメータを true に指定すると、マルチ AZ 配置サポートが有効になり、正しい `flink-conf.yaml` パラメータが設定されます。

`values.yaml` ファイルに以下の設定を行うことで、演算子の HA を無効にできます。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**マルチ AZ 配置**

複数のアベイラビリティーゾーンに演算子ポッドを作成します。これはソフト制約であり、別の AZ に十分なリソースがない場合、演算子ポッドは同じ AZ にスケジュールされます。

**リーダーレプリカの決定**

 HA が有効になっている場合、レプリカはリースを使用してどの JM がリーダーかを判断し、リーダーの選定には K8s リースを使用します。リースを記述し、.Spec.Holder Identity フィールドを確認することで、現在のリーダーを判断できます

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 インタラクション**

**アクセス認証情報の設定**

S3 バケットにアクセスするための適切な IAM 権限を IRSA に設定していることを確認してください。

**S3 アプリケーションモードからジョブジャーを取得**

Flink 演算子は S3 からのアプリケーションジャーの取得もサポートしています。FlinkDeployment 仕様で jarURI の S3 ロケーションを指定するだけです。

この機能は、PyFlink スクリプトのような他のアーティファクトのダウンロードにも使用できます。結果として生じる Python スクリプトはパス `/opt/flink/usrlib/` の下にドロップされます。

以下の例は、PyFlink ジョブにこの機能を使用する方法を示しています。jarURI フィールドと args フィールドに注意してください。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 コネクタ**

Flink には 2 つの S3 コネクタ (下記参照) が同梱されています。以下のセクションでは、どのコネクタをどのような場合に使用するかについて説明します。

**チェックポイント: Presto S3 コネクタ**
+ S3 スキームを s3p:// に設定します。
+ S3 へのチェックポイントに使用することをお勧めするコネクタ。詳細については、「Apache Flink ドキュメント」の「[S3 固有](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)」を参照してください。

FlinkDeployment 仕様の例:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**S3 への読み取りと書き込み: Hadoop S3 コネクタ**
+ S3 スキームを `s3://` または (`s3a://`) に設定する
+ S3 からのファイルの読み書きに推奨されるコネクタ ([Flinks Filesystem インターフェイス](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)を実装する S3 コネクタのみ)。
+ デフォルトでは、`fs.s3a.aws.credentials.provider` が `flink-conf.yaml` ファイルに設定されます。このファイルは `com.amazonaws.auth.WebIdentityTokenCredentialsProvider` です。デフォルト `flink-conf` を完全にオーバーライドして S3 とやり取りする場合は、必ずこのプロバイダーを使用してください。

FlinkDeployment 仕様の例

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink ジョブマネージャー
<a name="jobruns-flink-ha-manager"></a>

Flink デプロイの高可用性 (HA、High Availability) により、一時的なエラーが発生して JobManager がクラッシュした場合でも、ジョブを継続して進行させることができます。ジョブは HA が有効になっている正常な最後のチェックポイントから再開されます。HA が有効になっていないと、Kubernetes は JobManager を再起動しますが、ジョブは新しいジョブとして開始され、進行状況は失われます。HA を設定したら、JobManager で一時的な障害が発生した場合に参照できるように HA メタデータを永続ストレージに保存し、正常な最後のチェックポイントからジョブを再開するように Kubernetes に指示できます。

Flink ジョブでは HA がデフォルトで有効になっています (レプリカ数は 2 に設定されているため、HA メタデータを保持するための S3 ストレージロケーションを指定する必要があります)。

**HA 設定**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下は、Job Manager (.spec.jobManager で定義されている) における上記の HA 設定の説明です。
+ `highAvailabilityEnabled` (オプション、デフォルトは true): HA を有効にせず、提供されている HA 設定を使用しない場合は、これを `false ` に設定してください。引き続き、「replicas」フィールドを操作して HA を手動で設定することはできます。
+ `replicas` (オプション、デフォルトは 2): この数を 1 より大きく設定すると、他のスタンバイ JobManagers が作成され、ジョブのリカバリが速くなります。HA を無効にする場合は、レプリカ数を 1 に設定する必要があります。そうしないと、検証エラーが発生し続けます (HA が有効になっていない場合、サポートされるレプリカは 1 つのみです)。
+ `storageDir` (必須): デフォルトではレプリカ数を 2 に設定しているため、永続的な storageDir を用意する必要があります。現在、このフィールドはストレージロケーションとして S3 パスのみを受け付けます。

**ポッドのローカリティ**

 HA を有効にすると、同じ AZ 内のポッドのコロケーションも試みられるため、パフォーマンスが向上します (同じ AZ にポッドを配置することでネットワークレイテンシーが減少します)。これはベストエフォート型のプロセスです。つまり、ポッドの大部分がスケジュールされている AZ に十分なリソースがない場合でも、残りのポッドは引き続きスケジュールされますが、最終的にはこの AZ 外のノードに配置される可能性があります。

**リーダーレプリカの決定**

HA が有効になっている場合、レプリカはリースを使用してどの JM がリーダーかを判断し、このメタデータを保存するデータストアとして K8s Configmap を使用します。リーダーを特定する場合は、Configmap の内容とデータ内のキー `org.apache.flink.k8s.leader.restserver` を確認し、IP アドレスを持つ K8s ポッドを見つけてください。以下の bash コマンドを使用することもできます。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink ジョブ – ネイティブ Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 以降では、Amazon EKS クラスター上で高可用性モードで Flink アプリケーションを実行できるよう、Flink ネイティブの Kubernetes をサポートしています。

**注記**  
Flink ジョブを送信する際に高可用性メタデータを保存するには、Amazon S3 バケットを作成する必要があります。この機能を使用しない場合には無効にできます。これは、デフォルトでは有効になっています。

Flink の高可用性機能を有効にするには、[`run-application` CLI コマンドを実行する](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)ときに次の Flink パラメータを指定します。パラメータは例の下に定義されています。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – ジョブの高可用性メタデータを保存するための Amazon S3 バケット。

  **`Dkubernetes.jobmanager.replicas`** – 作成するジョブマネージャーポッドの数を、`1` より大きい整数で指定します。

  **`Dkubernetes.cluster-id`** – Flink クラスターを識別する固有の ID です。