

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Flink 如何支持高可用性和作业弹性
<a name="jobruns-flink-resiliency"></a>

以下部分概述了 Flink 如何使作业更加可靠和高度可用。Flink 通过内置的高可用性功能和各种故障恢复功能来实现这一点。

**Topics**
+ [为 Flink Operator 和 Flink 应用程序使用高可用性（HA）](jobruns-flink-using-ha.md)
+ [使用 EKS 上的 Amazon EMR 优化 Flink 任务重启时间以进行任务恢复和扩展操作](jobruns-flink-restart.md)
+ [使用 EKS 上的 Amazon EMR 上的 Flink 正常停用竞价型实例](jobruns-flink-decommission.md)

# 为 Flink Operator 和 Flink 应用程序使用高可用性（HA）
<a name="jobruns-flink-using-ha"></a>

本主题介绍了如何配置高可用性以及如何在几种不同的用例中使用。包括在使用 Job Manager 和 Flink Native Kubernetes 时。

## Flink Operator 高可用性
<a name="jobruns-flink-ha-operator"></a>

我们为 Flink Operator 启用了*高可用性*，这样就可以使用备用 Flink Operator 进行故障转移，从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”，启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 `values.yaml` 文件中配置副本字段。

以下字段支持自定义：
+ `replicas`（可选，默认值为 2）：将此数字设置为大于 1 会创建其他备用 Operator，从而更快地恢复任务。
+ `highAvailabilityEnabled`（可选，默认值为 true）：控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持，并设置正确的 `flink-conf.yaml` 参数。

在 `values.yaml` 文件中设置以下配置可以为 Operator 禁用 HA。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**多可用区部署**

我们在多个可用区中创 Operator Pod。这是一个软约束，如果不同可用区中没有足够的资源，您的 Operator Pod 将被调度到同一可用区中。

**确定主副本**

 如果启用了 HA，则副本使用租约来确定哪个 JMs 是领导者，并使用 K8s Lease 进行领导者选举。您可以描述租约并查看 .Spec.Holder Identity 字段来确定当前主副本

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 交互**

**配置访问凭证**

请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。

**从 S3 应用程序模式获取任务 jar**

Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 JarurI 的 S3 位置即可。

您也可以使用此功能下载其他工件，例如 PyFlink 脚本。生成的 Python 脚本放在路径 `/opt/flink/usrlib/` 下。

以下示例演示了如何将此功能用于作 PyFlink 业。注意 jarURI 和 args 字段。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 连接器**

Flink 随附两个 S3 连接器（如下所示）。以下各节旨在介绍何时使用哪个连接器。

**检查点：Presto S3 连接器**
+ 将 S3 方案设置为 s3p://
+ 用于检查点到 s3 的推荐连接器。有关更多信息，请参阅 Apache 文档中[特定于 S3 的内容](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)。

示例 FlinkDeployment 规范：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**读取/写入 S3：Hadoop S3 连接器**
+ 将 S3 方案设置为 `s3://` 或 `s3a://`
+ 用于从 S3 读取和写入文件的推荐连接器（仅限实现 [Flinks Filesystem 接口](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)的 S3 连接器）。
+ 默认在 `flink-conf.yaml` 文件中设置 `fs.s3a.aws.credentials.provider`，即 `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`。如果完全覆盖默认值 `flink-conf`，并且正在与 S3 进行交互，请务必使用此提供程序。

示例 FlinkDeployment 规范

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Flink Deployments 的高可用性 (HA) 允许任务继续取得进展，即使遇到暂时性错误并 JobManager 导致崩溃。任务将重新启动，但会从上次成功启用 HA 的检查点开始。如果未启用 HA，Kubernetes 将重启你的 JobManager，但你的作业将以全新的作业开始并失去其进度。配置 HA 后，我们可以让 Kubernetes 将 HA 元数据存储在永久存储中，以便在中出现暂时故障时参考， JobManager 然后从上次成功的检查点恢复我们的作业。

Flink 任务会默认启用 HA（副本计数设置为 2，这需要您提供 S3 存储位置来永久存储 HA 元数据）。

**HA 配置**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下是 Job Manager 中上述 HA 配置的描述（在 .spec.jobManager 下定义）：
+ `highAvailabilityEnabled`（可选，默认值为 true）：如果不想启用 HA，也不想使用提供的 HA 配置，请将其设置为 `false `。您仍然可以操作“replicas”字段来手动配置 HA。
+ `replicas`（可选，默认为 2）：将此数字设置为大于 1 会创建其他待机状态 JobManagers ，从而可以更快地恢复作业。如果禁用 HA，则必须将副本计数设置为 1，否则会不断收到验证错误（如果未启用 HA，则仅支持 1 个副本）。
+ `storageDir`（必需）：由于默认使用副本计数 2，我们必须提供永久 storageDir。目前，此字段仅接受 S3 路径作为存储位置。

**Pod 区域**

 如果您启用 HA，我们还会尝试将 Pod 放在同一个可用区中，这样可以提高性能（通过将 pod 放在同一个可用区中来减少网络延迟 AZs）。这是一个尽力而为的过程，即如果在调度了大多数 Pod 的可用区中没有足够的资源，那么剩余 Pod 仍会被调度，但最终可能会出现在该可用区之外的节点上。

**确定主副本**

如果启用了 HA，则副本使用租约来确定哪个 JMs 是领导者，并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本，可以查看 Configmap 的内容，在数据下查看密钥 `org.apache.flink.k8s.leader.restserver`，找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 作业 - 本机 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本都支持 Flink 本机 Kubernetes 在 Amazon EKS 集群上以高可用性模式运行 Flink 应用程序。

**注意**  
提交 Flink 作业时，必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能，可以将其禁用。系统会默认启用该功能。

要开启 Flink 高可用性功能，请在[运行 `run-application` CLI 命令](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)时提供以下 Flink 参数。参数在示例的下方定义。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**：您要存储作业的高可用性元数据的 Amazon S3 存储桶。

  **`Dkubernetes.jobmanager.replicas`**：要创建的 Job Manager 容器组（pod）的数量，以大于 `1` 的整数表示。

  **`Dkubernetes.cluster-id`**：标识 Flink 集群的唯一 ID。

# 使用 EKS 上的 Amazon EMR 优化 Flink 任务重启时间以进行任务恢复和扩展操作
<a name="jobruns-flink-restart"></a>

当任务失败或发生扩展操作时，Flink 会尝试从上一次完成的检查点重新执行任务。重启过程可能需要一分钟或更长时间才能执行，具体取决于检查点状态的大小以及并行任务的数量。重启期间，可以累积作业的积压任务。但是，Flink 可以通过一些方法来优化执行图的恢复和重启速度，从而提高作业稳定性。

本页介绍了 Amazon EMR Flink 对竞价型实例执行任务恢复或扩展操作过程中缩短作业重启时间的一些方法。竞价型实例是未使用的计算容量，以折扣价提供。该实例具有独特的行为，偶尔发生中断，因此了解 Amazon EMR on EKS 如何处理这些行为非常重要，包括 Amazon EMR on EKS 如何执行停用和作业重启。

**Topics**
+ [任务本地恢复](#flink-restart-task-local)
+ [通过 Amazon EBS 卷挂载实现的任务本地恢复](#flink-restart-task-local-ebs)
+ [基于日志的通用增量检查点](#flink-restart-log-check)
+ [精细恢复](#flink-restart-fine-grained)
+ [自适应计划程序中的组合重启机制](#flink-restart-combined)

## 任务本地恢复
<a name="flink-restart-task-local"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持任务本地恢复。

使用 Flink 检查点，每个任务都会生成其状态的快照，Flink 会将该快照写入分布式存储（如 Amazon S3）。在恢复的情况下，任务会从分布式存储中恢复其状态。分布式存储提供容错能力，并且可以在重新扩展期间重新分配状态，因为它可供所有节点访问。

但远程分布式存储也有一个缺点：所有任务都必须通过网络从远程位置读取其状态。在任务恢复或扩展操作期间，这可能会导致大规模状态的恢复时间很长。

通过*任务本地恢复*可以解决恢复时间长这一问题。任务将其在检查点上的状态写入任务本地的辅助存储（例如本地磁盘）。它们还将状态存储在主存储中，或者存储在 Amazon S3 中（在本例中）。恢复期间，计划程序将任务计划在任务之前运行所在的同一个任务管理器上，这样它们就可以从本地状态存储中恢复，而不是从远程状态存储中读取。有关更多信息，请参阅 *Apache Flink 文档*中的[任务本地恢复](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我们对示例作业进行的基准测试表明，启用任务本地恢复后，恢复时间已从几分钟缩短到几秒。

要启用任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## 通过 Amazon EBS 卷挂载实现的任务本地恢复
<a name="flink-restart-task-local-ebs"></a>

**注意**  
EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 支持 Amazon EBS 的任务本地恢复。

使用 EKS 上的 Amazon EMR 上的 Flink，您可以将 Amazon EBS 卷自动预调配到 TaskManager 容器组（pod）中以进行任务本地恢复。默认的叠加挂载随附 10GB 的卷，足以满足状态较低的作业。状态较大的作业可以启用*自动 EBS 卷挂载*选项。TaskManager 容器组（pod）是在创建容器组（pod）时自动创建和挂载的，在删除容器组（pod）时会被移除。

按照以下步骤为 EKS 上的 Amazon EMR 中的 Flink 启用自动 EBS 卷挂载：

1. 导出您以下变量的值，您将在接下来的步骤中使用它们。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 为集群创建或更新 `kubeconfig` YAML 文件。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. 在 Amazon EKS 集群上为 Amazon EBS Container Storage Interface（CSI）驱动程序创建 IAM 服务账户。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 使用以下命令创建 Amazon EBS CSI 驱动程序：

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 使用以下命令创建 Amazon EBS 存储类：

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   然后应用该类：

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm 使用创建服务账户的选项安装 Amazon EMR Flink Kubernetes Operator。这将创建在 Flink 部署中使用的 `emr-containers-sa-flink`。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. 要提交 Flink 作业并启用 EBS 卷的自动预调配以进行任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。调整作业状态大小的大小限制。将 `serviceAccount` 设置为 `emr-containers-sa-flink`。指定检查点间隔值，以毫秒为单位。并省略 `executionRoleArn`。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

当您准备删除 Amazon EBS CSI 驱动程序插件时，请使用以下命令：

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 基于日志的通用增量检查点
<a name="flink-restart-log-check"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持基于日志的通用增量检查点。

Flink 1.16 中添加了基于日志的通用增量检查点功能，以提高检查点的速度。较快的检查点间隔通常会导致恢复工作减少，因为恢复后需要重新处理的事件较少。有关更多信息，请参阅 *Apache Flink 博客*上的[使用基于日志的通用增量检查点提高检查点的速度和稳定性](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

对于示例作业，我们的基准测试表明，使用基于日志的通用增量检查点时，检查点时间从几分钟缩短到几秒。

要启用基于日志的通用增量检查点，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精细恢复
<a name="flink-restart-fine-grained"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持对默认计划程序的精细恢复支持。EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 提供自适应计划程序的精细恢复支持。

当任务在执行过程中失败时，Flink 会重置整个执行图，并从上次完成的检查点触发完整的重新执行。这比仅重新执行失败的任务更昂贵。精细恢复仅重新启动失败的任务与管道连接的组件。在以下示例中，作业图有 5 个顶点（`A` 到 `E`）。顶点之间的所有连接都使用逐点分布进行管道化处理，作业的 `parallelism.default` 设置为 `2`。

```
A → B → C → D → E
```

在本示例中，总共有 10 个任务在运行。第一个管道（`a1` 到 `e1`）在 TaskManager（`TM1`）上运行，第二个管道（`a2` 到 `e2`）在另一个 TaskManager（`TM2`）上运行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有两个管道连接的组件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 其中一个失败，则故障仅影响 TaskManager 正在其中运行的管道中的 5 个任务。重启策略仅会启动受影响的管道化组件。

精细恢复仅适用于完全并行的 Flink 作业。`keyBy()` 或 `redistribute()` 操作不支持。有关更多信息，请参阅 *Flink 改进提案* Jira 项目中的 [FLIP-1：从任务失败中进行精细恢复](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

要启用精细恢复，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 自适应计划程序中的组合重启机制
<a name="flink-restart-combined"></a>

**注意**  
EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 支持自适应计划程序中的组合重启机制。

自适应计划程序可以根据可用插槽调整作业的并行度。如果没有足够的插槽来适应配置的作业并行度，它将自动降低并行度。如果有新的插槽可用，则任务将再次纵向扩展到配置的作业并行度。当没有足够的可用资源时，自适应计划程序将避免作业停机。这是 Flink Autoscaler 支持的计划程序。出于这些原因，我们建议在 Amazon EMR Flink 中使用自适应计划程序。但是，自适应计划程序可能会在短时间内进行多次重启，每添加一个新资源就会重启一次。这可能导致作业性能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在自适应计划程序中具有组合重启机制，可在添加第一个资源时打开一个重启窗口，然后等到配置的默认 1 分钟窗口间隔时。当有足够的资源可用来运行具有配置并行性的作业时，或者当间隔超时时，它会执行一次重启。

对于示例作业，我们的基准测试表明，当您使用自适应计划程序和 Flink Autoscaler 时，此功能处理的记录比默认行为多 10%。

要启用组合重启机制，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# 使用 EKS 上的 Amazon EMR 上的 Flink 正常停用竞价型实例
<a name="jobruns-flink-decommission"></a>

使用 EKS 上的 Amazon EMR 的 Flink 可以在任务恢复或扩展操作期间缩短作业的重启时间。

## 概述
<a name="jobruns-flink-decommission-overview"></a>

EKS 上的 Amazon EMR 发行版 6.15.0 及更高版本支持在带有 Apache Flink 的 EKS 上的 Amazon EMR 中正常停用竞价型实例上的任务管理器。作为此功能的一部分，带有 Flink 的 EKS 上的 Amazon EMR 提供了以下功能：
+ **Just-in-time check** pointing — Flink 流式传输作业可以响应竞价型实例中断，对正在运行的作业执行 just-in-time (JIT) 检查点，并防止在这些竞价型实例上安排其他任务。默认和自适应计划程序支持 JIT 检查点。
+ **组合重启机制**：组合重启机制会在作业达到目标资源并行度或当前配置窗口结束后尽最大努力尝试重新启动作业。这样做还可以防止由于多次竞价型实例终止而导致作业连续重启。组合重启机制仅适用于自适应计划程序。

这些功能具有以下优势：
+ 您可以利用竞价型实例来运行任务管理器并降低集群开支。
+ 竞价型实例任务管理器的活动性提高使弹性得到提高，作业计划的效率提升。
+ 您的 Flink 作业将有更长的正常运行时间，因为竞价型实例终止后的重启次数将会减少。

## 正常停用的工作原理
<a name="jobruns-flink-decommission-howitworks"></a>

考虑以下示例：您预调配了运行 Apache Flink 的 EKS 上的 Amazon EMR，为作业管理器指定了按需节点，为任务管理器指定了竞价型实例节点。终止前两分钟，任务管理器收到中断通知。

在这种情况下，作业管理器将处理竞价型实例中断信号，阻止在竞价型实例上计划其他任务，并为流式传输作业启动 JIT 检查点。

然后，只有在当前重启间隔窗口内有足够的新资源可用来满足当前作业并行度之后，作业管理器才会重新启动任务图。重启窗口间隔是根据竞价型实例替换持续时间、新的任务管理器容器组（pod）的创建以及向作业管理器的注册来决定的。

## 先决条件
<a name="jobruns-flink-decommission-prereqs"></a>

要使用正常停用，请在运行 Apache Flink 的 Amazon EMR on EKS 集群上创建并运行流处理作业。启用在至少一个竞价型实例上计划的自适应计划程序和任务管理器，如以下示例所示。您应该为作业管理器使用按需节点，并且只要至少有一个竞价型实例，您就可以将按需节点用于任务管理器。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## 配置
<a name="jobruns-flink-decommission-config"></a>

本部分涵盖了您可以根据自己停用需求指定的大多数配置。


| Key | 说明 | 默认 值 | 可接受值 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  启用任务管理器的正常停用。  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  在自适应计划程序中启用组合重启机制。  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  为作业执行合并重启的合并重启窗口间隔。不含单位的整数将被解释为毫秒。  |  1m  |  示例：30、60s、3m、1h。 | 