

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)
<a name="jobruns-flink-using-ha"></a>

本主題說明如何設定高可用性，並說明它在幾個不同的使用案例中的運作方式。這包括當您使用 任務管理員時，以及當您使用 Flink 原生 kubernetes 時。

## Flink Operator 高可用性
<a name="jobruns-flink-ha-operator"></a>

我們啟用 Flink Operator 的*高可用性*，以便可以容錯移轉至待命 Flink Operator，在發生故障時將 Operator 控制迴圈中的停機時間降至最低。依預設會啟用「高可用性」，且起始 Operator 複本的預設數目為 2。可以在 `values.yaml` 檔案中設定 Helm Chart 的複本欄位。

下列欄位可自訂：
+ `replicas` (選用，預設值為 2)：將此數字設定為大於 1 可建立其他待命 Operator，並允許更快速地復原作業。
+ `highAvailabilityEnabled` (選用，預設值為 true)：控制是否要啟用 HA。將此參數指定為 true 可啟用多可用區部署支援，並設定正確的 `flink-conf.yaml` 參數。

透過在 `values.yaml` 檔案中設定下列組態，停用 operator 的高可用性。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**多可用區部署**

我們會在多個可用區域建立 operator Pod。這是一個軟約束，如果您在不同的可用區域中沒有足夠的資源，將在相同的可用區域中排程您的 operator Pod 。

**確定領導者複本**

 如果啟用 HA，則複本使用 Lease 來確定哪些 JM 是領導者，並使用 K8s Lease 進行領導者選舉。您可以描述 Lease 並查看 .Spec.Holder Identity 欄位，以確定目前的領導者

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 互動**

**設定存取憑證**

請確定您已設定 IRSA，具有可存取 S3 儲存貯體的適當 IAM 許可。

**從 S3 應用程式模式中擷取作業 jar**

Flink Operator 也支援從 S3 中擷取應用程式 jar。只需在 FlinkDeployment 規格中提供 jarURI 的 S3 位置即可。

也可以使用此功能來下載其他成品，例如 PyFLink 指令碼。生成的 Python 指令碼放在路徑 `/opt/flink/usrlib/` 下。

以下範例示範如何將此功能用於 PyFLink 作業。請注意 jarURI 和引數欄位。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 連接器**

Flink 隨附有兩個 S3 連接器 (如下所列)。以下各章節討論何時使用哪個連接器。

**檢查點：Presto S3 連接器**
+ 將 S3 結構描述設為 s3p://
+ 用於檢查點到 s3 的建議連接器。如需詳細資訊，請參閱 Apache Flink 文件中的 [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)。

FlinkDeployment 規格範例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**讀取和寫入 S3：Hadoop S3 連接器**
+ 將 S3 結構描述設定為 `s3://` 或 (`s3a://`)
+ 用於從 S3 讀取和寫入檔案的建議連接器 (只有 S3 連接器實作 [Flinks Filesystem 介面](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/))。
+ 根據預設，我們在 `flink-conf.yaml` 檔案`fs.s3a.aws.credentials.provider`中設定 ，也就是 `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`。如果完全覆寫預設 `flink-conf` 並且正在與 S3 進行互動，請確保使用此提供程式。

FlinkDeployment 規格範例

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Flink 部署的高可用性 (HA) 可讓作業繼續進行，即使遇到暫時性錯誤以及 JobManager 當機。作業將會重新啟動，但會從上次啟用 HA 的成功檢查點開始。如果沒有啟用 HA，Kubernetes 將重新啟動 JobManager，但您的作業將作為新作業開始，並將失去其進度。設定 HA 之後，我們可以告訴 Kubernetes 將 HA 中繼資料儲存在永久性儲存體中，以便在 JobManager 中發生暫時性失敗時進行參考，然後從上次成功的檢查點恢復我們的作業。

Flink 作業預設為啟用 HA (複本計數設為 2，這將要求您提供 S3 儲存位置，以便 HA 中繼資料持續存在)。

**HA 組態**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下是 Job Manager 中上述 HA 組態的描述 (在 .spec.jobManager 下定義)：
+ `highAvailabilityEnabled` (選用，預設值為 true)：如果您不想啟用 HA 且不想使用提供的 HA 組態，請將此設定為 `false `。您仍然可以操作「複本」欄位以手動設定 HA。
+ `replicas` (選用，預設值為 2)：將此數字設定為大於 1 可建立其他待命 JobManager，並允許更快速地復原作業。如果停用 HA，則必須將複本計數設定為 1，否則您將繼續收到驗證錯誤 (如果未啟用 HA，則僅支援 1 個複本)。
+ `storageDir` (必填)：因為我們預設使用的複本計數為 2，所以我們必須提供一個持久的 StorageDir。目前，此欄位僅接受 S3 路徑作為儲存位置。

**Pod 位置**

 如果您啟用 HA，我們也會嘗試在同一個可用區域中共置 Pod，進而提升效能 (藉由將 Pod 放在相同可用區域來減少網路延遲)。這是一個竭盡全力的過程，意味著如果您在對大部分 Pod 進行排程的可用區域中沒有足夠的資源，剩餘的 Pod 仍會排程，但最終可能會在此可用區域以外的節點上結束。

**確定領導者複本**

如果啟用 HA，複本會使用租用來判斷哪個 JM 是領導者，並使用 K8s Configmap 作為儲存此中繼資料的資料儲存。如果您想確定領導者，可以查看 Configmap 的內容，然後查看資料下的關鍵字 `org.apache.flink.k8s.leader.restserver`，以使用該 IP 地址尋找 K8s Pod。也可使用下列 bash 命令。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 作業：原生 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本支援 Flink Native Kubernetes，以便在 Amazon EKS 叢集上以高可用性模式執行 Flink 應用程式。

**注意**  
提交 Flink 作業時，必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能，可以停用它。依預設會啟用此功能。

若要開啟 Flink 高可用性功能，請在[執行 `run-application` CLI 命令](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)時提供下列 Flink 參數。參數定義於範例下方。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**：您要在其中存放作業的高可用性中繼資料的 Amazon S3 儲存貯體。

  **`Dkubernetes.jobmanager.replicas`**：要建立的 Job Manager Pod 數量，為大於 `1` 的整數。

  **`Dkubernetes.cluster-id`**：識別 Flink 叢集的唯一識別碼。