

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 为 Flink Operator 和 Flink 应用程序使用高可用性（HA）
<a name="jobruns-flink-using-ha"></a>

本主题介绍了如何配置高可用性以及如何在几种不同的用例中使用。包括在使用 Job Manager 和 Flink Native Kubernetes 时。

## Flink Operator 高可用性
<a name="jobruns-flink-ha-operator"></a>

我们为 Flink Operator 启用了*高可用性*，这样就可以使用备用 Flink Operator 进行故障转移，从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”，启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 `values.yaml` 文件中配置副本字段。

以下字段支持自定义：
+ `replicas`（可选，默认值为 2）：将此数字设置为大于 1 会创建其他备用 Operator，从而更快地恢复任务。
+ `highAvailabilityEnabled`（可选，默认值为 true）：控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持，并设置正确的 `flink-conf.yaml` 参数。

在 `values.yaml` 文件中设置以下配置可以为 Operator 禁用 HA。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**多可用区部署**

我们在多个可用区中创 Operator Pod。这是一个软约束，如果不同可用区中没有足够的资源，您的 Operator Pod 将被调度到同一可用区中。

**确定主副本**

 如果启用了 HA，则副本使用租约来确定哪个 JMs 是领导者，并使用 K8s Lease 进行领导者选举。您可以描述租约并查看 .Spec.Holder Identity 字段来确定当前主副本

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 交互**

**配置访问凭证**

请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。

**从 S3 应用程序模式获取任务 jar**

Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 JarurI 的 S3 位置即可。

您也可以使用此功能下载其他工件，例如 PyFlink 脚本。生成的 Python 脚本放在路径 `/opt/flink/usrlib/` 下。

以下示例演示了如何将此功能用于作 PyFlink 业。注意 jarURI 和 args 字段。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 连接器**

Flink 随附两个 S3 连接器（如下所示）。以下各节旨在介绍何时使用哪个连接器。

**检查点：Presto S3 连接器**
+ 将 S3 方案设置为 s3p://
+ 用于检查点到 s3 的推荐连接器。有关更多信息，请参阅 Apache 文档中[特定于 S3 的内容](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)。

示例 FlinkDeployment 规范：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**读取/写入 S3：Hadoop S3 连接器**
+ 将 S3 方案设置为 `s3://` 或 `s3a://`
+ 用于从 S3 读取和写入文件的推荐连接器（仅限实现 [Flinks Filesystem 接口](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)的 S3 连接器）。
+ 默认在 `flink-conf.yaml` 文件中设置 `fs.s3a.aws.credentials.provider`，即 `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`。如果完全覆盖默认值 `flink-conf`，并且正在与 S3 进行交互，请务必使用此提供程序。

示例 FlinkDeployment 规范

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Flink Deployments 的高可用性 (HA) 允许任务继续取得进展，即使遇到暂时性错误并 JobManager 导致崩溃。任务将重新启动，但会从上次成功启用 HA 的检查点开始。如果未启用 HA，Kubernetes 将重启你的 JobManager，但你的作业将以全新的作业开始并失去其进度。配置 HA 后，我们可以让 Kubernetes 将 HA 元数据存储在永久存储中，以便在中出现暂时故障时参考， JobManager 然后从上次成功的检查点恢复我们的作业。

Flink 任务会默认启用 HA（副本计数设置为 2，这需要您提供 S3 存储位置来永久存储 HA 元数据）。

**HA 配置**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下是 Job Manager 中上述 HA 配置的描述（在 .spec.jobManager 下定义）：
+ `highAvailabilityEnabled`（可选，默认值为 true）：如果不想启用 HA，也不想使用提供的 HA 配置，请将其设置为 `false `。您仍然可以操作“replicas”字段来手动配置 HA。
+ `replicas`（可选，默认为 2）：将此数字设置为大于 1 会创建其他待机状态 JobManagers ，从而可以更快地恢复作业。如果禁用 HA，则必须将副本计数设置为 1，否则会不断收到验证错误（如果未启用 HA，则仅支持 1 个副本）。
+ `storageDir`（必需）：由于默认使用副本计数 2，我们必须提供永久 storageDir。目前，此字段仅接受 S3 路径作为存储位置。

**Pod 区域**

 如果您启用 HA，我们还会尝试将 Pod 放在同一个可用区中，这样可以提高性能（通过将 pod 放在同一个可用区中来减少网络延迟 AZs）。这是一个尽力而为的过程，即如果在调度了大多数 Pod 的可用区中没有足够的资源，那么剩余 Pod 仍会被调度，但最终可能会出现在该可用区之外的节点上。

**确定主副本**

如果启用了 HA，则副本使用租约来确定哪个 JMs 是领导者，并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本，可以查看 Configmap 的内容，在数据下查看密钥 `org.apache.flink.k8s.leader.restserver`，找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 作业 - 本机 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本都支持 Flink 本机 Kubernetes 在 Amazon EKS 集群上以高可用性模式运行 Flink 应用程序。

**注意**  
提交 Flink 作业时，必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能，可以将其禁用。系统会默认启用该功能。

要开启 Flink 高可用性功能，请在[运行 `run-application` CLI 命令](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)时提供以下 Flink 参数。参数在示例的下方定义。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**：您要存储作业的高可用性元数据的 Amazon S3 存储桶。

  **`Dkubernetes.jobmanager.replicas`**：要创建的 Job Manager 容器组（pod）的数量，以大于 `1` 的整数表示。

  **`Dkubernetes.cluster-id`**：标识 Flink 集群的唯一 ID。