

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 EKS 上的 Amazon EMR 优化 Flink 任务重启时间以进行任务恢复和扩展操作
<a name="jobruns-flink-restart"></a>

当任务失败或发生扩展操作时，Flink 会尝试从上一次完成的检查点重新执行任务。重启过程可能需要一分钟或更长时间才能执行，具体取决于检查点状态的大小以及并行任务的数量。重启期间，可以累积作业的积压任务。但是，Flink 可以通过一些方法来优化执行图的恢复和重启速度，从而提高作业稳定性。

本页介绍了 Amazon EMR Flink 对竞价型实例执行任务恢复或扩展操作过程中缩短作业重启时间的一些方法。竞价型实例是未使用的计算容量，以折扣价提供。该实例具有独特的行为，偶尔发生中断，因此了解 Amazon EMR on EKS 如何处理这些行为非常重要，包括 Amazon EMR on EKS 如何执行停用和作业重启。

**Topics**
+ [任务本地恢复](#flink-restart-task-local)
+ [通过 Amazon EBS 卷挂载实现的任务本地恢复](#flink-restart-task-local-ebs)
+ [基于日志的通用增量检查点](#flink-restart-log-check)
+ [精细恢复](#flink-restart-fine-grained)
+ [自适应计划程序中的组合重启机制](#flink-restart-combined)

## 任务本地恢复
<a name="flink-restart-task-local"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持任务本地恢复。

使用 Flink 检查点，每个任务都会生成其状态的快照，Flink 会将该快照写入分布式存储（如 Amazon S3）。在恢复的情况下，任务会从分布式存储中恢复其状态。分布式存储提供容错能力，并且可以在重新扩展期间重新分配状态，因为它可供所有节点访问。

但远程分布式存储也有一个缺点：所有任务都必须通过网络从远程位置读取其状态。在任务恢复或扩展操作期间，这可能会导致大规模状态的恢复时间很长。

通过*任务本地恢复*可以解决恢复时间长这一问题。任务将其在检查点上的状态写入任务本地的辅助存储（例如本地磁盘）。它们还将状态存储在主存储中，或者存储在 Amazon S3 中（在本例中）。恢复期间，计划程序将任务计划在任务之前运行所在的同一个任务管理器上，这样它们就可以从本地状态存储中恢复，而不是从远程状态存储中读取。有关更多信息，请参阅 *Apache Flink 文档*中的[任务本地恢复](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我们对示例作业进行的基准测试表明，启用任务本地恢复后，恢复时间已从几分钟缩短到几秒。

要启用任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## 通过 Amazon EBS 卷挂载实现的任务本地恢复
<a name="flink-restart-task-local-ebs"></a>

**注意**  
EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 支持 Amazon EBS 的任务本地恢复。

使用 EKS 上的 Amazon EMR 上的 Flink，您可以将 Amazon EBS 卷自动预调配到 TaskManager 容器组（pod）中以进行任务本地恢复。默认的叠加挂载随附 10GB 的卷，足以满足状态较低的作业。状态较大的作业可以启用*自动 EBS 卷挂载*选项。TaskManager 容器组（pod）是在创建容器组（pod）时自动创建和挂载的，在删除容器组（pod）时会被移除。

按照以下步骤为 EKS 上的 Amazon EMR 中的 Flink 启用自动 EBS 卷挂载：

1. 导出您以下变量的值，您将在接下来的步骤中使用它们。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 为集群创建或更新 `kubeconfig` YAML 文件。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. 在 Amazon EKS 集群上为 Amazon EBS Container Storage Interface（CSI）驱动程序创建 IAM 服务账户。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 使用以下命令创建 Amazon EBS CSI 驱动程序：

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 使用以下命令创建 Amazon EBS 存储类：

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   然后应用该类：

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm 使用创建服务账户的选项安装 Amazon EMR Flink Kubernetes Operator。这将创建在 Flink 部署中使用的 `emr-containers-sa-flink`。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. 要提交 Flink 作业并启用 EBS 卷的自动预调配以进行任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。调整作业状态大小的大小限制。将 `serviceAccount` 设置为 `emr-containers-sa-flink`。指定检查点间隔值，以毫秒为单位。并省略 `executionRoleArn`。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

当您准备删除 Amazon EBS CSI 驱动程序插件时，请使用以下命令：

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 基于日志的通用增量检查点
<a name="flink-restart-log-check"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持基于日志的通用增量检查点。

Flink 1.16 中添加了基于日志的通用增量检查点功能，以提高检查点的速度。较快的检查点间隔通常会导致恢复工作减少，因为恢复后需要重新处理的事件较少。有关更多信息，请参阅 *Apache Flink 博客*上的[使用基于日志的通用增量检查点提高检查点的速度和稳定性](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

对于示例作业，我们的基准测试表明，使用基于日志的通用增量检查点时，检查点时间从几分钟缩短到几秒。

要启用基于日志的通用增量检查点，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精细恢复
<a name="flink-restart-fine-grained"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持对默认计划程序的精细恢复支持。EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 提供自适应计划程序的精细恢复支持。

当任务在执行过程中失败时，Flink 会重置整个执行图，并从上次完成的检查点触发完整的重新执行。这比仅重新执行失败的任务更昂贵。精细恢复仅重新启动失败的任务与管道连接的组件。在以下示例中，作业图有 5 个顶点（`A` 到 `E`）。顶点之间的所有连接都使用逐点分布进行管道化处理，作业的 `parallelism.default` 设置为 `2`。

```
A → B → C → D → E
```

在本示例中，总共有 10 个任务在运行。第一个管道（`a1` 到 `e1`）在 TaskManager（`TM1`）上运行，第二个管道（`a2` 到 `e2`）在另一个 TaskManager（`TM2`）上运行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有两个管道连接的组件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 其中一个失败，则故障仅影响 TaskManager 正在其中运行的管道中的 5 个任务。重启策略仅会启动受影响的管道化组件。

精细恢复仅适用于完全并行的 Flink 作业。`keyBy()` 或 `redistribute()` 操作不支持。有关更多信息，请参阅 *Flink 改进提案* Jira 项目中的 [FLIP-1：从任务失败中进行精细恢复](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

要启用精细恢复，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 自适应计划程序中的组合重启机制
<a name="flink-restart-combined"></a>

**注意**  
EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 支持自适应计划程序中的组合重启机制。

自适应计划程序可以根据可用插槽调整作业的并行度。如果没有足够的插槽来适应配置的作业并行度，它将自动降低并行度。如果有新的插槽可用，则任务将再次纵向扩展到配置的作业并行度。当没有足够的可用资源时，自适应计划程序将避免作业停机。这是 Flink Autoscaler 支持的计划程序。出于这些原因，我们建议在 Amazon EMR Flink 中使用自适应计划程序。但是，自适应计划程序可能会在短时间内进行多次重启，每添加一个新资源就会重启一次。这可能导致作业性能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在自适应计划程序中具有组合重启机制，可在添加第一个资源时打开一个重启窗口，然后等到配置的默认 1 分钟窗口间隔时。当有足够的资源可用来运行具有配置并行性的作业时，或者当间隔超时时，它会执行一次重启。

对于示例作业，我们的基准测试表明，当您使用自适应计划程序和 Flink Autoscaler 时，此功能处理的记录比默认行为多 10%。

要启用组合重启机制，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```