

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 优化任务恢复和扩展操作的作业重启时间
<a name="flink-restart"></a>

当任务失败或发生扩展操作时，Flink 会尝试从上一次完成的检查点重新执行任务。重启过程可能需要一分钟或更长时间才能执行，具体取决于检查点状态的大小以及并行任务的数量。重启期间，可以累积作业的积压任务。但是，Flink 可以通过一些方法来优化执行图的恢复和重启速度，从而提高作业稳定性。

本页介绍了 Amazon EMR Flink 可以在任务恢复或扩展操作期间缩短作业的重启时间的一些方法。

**Topics**
+ [任务本地恢复](#flink-restart-task-local)
+ [基于日志的通用增量检查点](#flink-restart-log-check)
+ [精细恢复](#flink-restart-fine-grained)
+ [自适应计划程序中的组合重启机制](#flink-restart-combined)

## 任务本地恢复
<a name="flink-restart-task-local"></a>

**注意**  
Amazon EMR 6.0.0 及更高版本都支持任务的本地恢复。

使用 Flink 检查点，每个任务都会生成其状态的快照，Flink 会将该快照写入分布式存储（如 Amazon S3）。在恢复的情况下，任务会从分布式存储中恢复其状态。分布式存储提供容错能力，并且可以在重新扩展期间重新分配状态，因为它可供所有节点访问。

但远程分布式存储也有一个缺点：所有任务都必须通过网络从远程位置读取其状态。在任务恢复或扩展操作期间，这可能会导致大规模状态的恢复时间很长。

通过*任务本地恢复*可以解决恢复时间长这一问题。任务将其在检查点上的状态写入任务本地的辅助存储（例如本地磁盘）。它们还将状态存储在主存储中，或者存储在 Amazon S3 中（在本例中）。恢复期间，计划程序将任务计划在任务之前运行所在的同一个任务管理器上，这样它们就可以从本地状态存储中恢复，而不是从远程状态存储中读取。有关更多信息，请参阅 *Apache Flink 文档*中的[任务本地恢复](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我们对示例作业进行的基准测试表明，启用任务本地恢复后，恢复时间已从几分钟缩短到几秒。

要启用任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 基于日志的通用增量检查点
<a name="flink-restart-log-check"></a>

**注意**  
Amazon EMR 6.10.0 及更高版本支持基于日志的通用增量检查点。

Flink 1.16 中添加了基于日志的通用增量检查点功能，以提高检查点的速度。较快的检查点间隔通常会导致恢复工作减少，因为恢复后需要重新处理的事件较少。有关更多信息，请参阅 *Apache Flink 博客*上的[使用基于日志的通用增量检查点提高检查点的速度和稳定性](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

对于示例作业，我们的基准测试表明，使用基于日志的通用增量检查点时，检查点时间从几分钟缩短到几秒。

要启用基于日志的通用增量检查点，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精细恢复
<a name="flink-restart-fine-grained"></a>

**注意**  
Amazon EMR 6.0.0 及更高版本提供默认计划程序的精细恢复支持。Amazon EMR 6.15.0 及更高版本提供自适应计划程序的精细恢复支持。

当任务在执行过程中失败时，Flink 会重置整个执行图，并从上次完成的检查点触发完整的重新执行。这比仅重新执行失败的任务更昂贵。精细恢复仅重新启动失败的任务与管道连接的组件。在以下示例中，作业图有 5 个顶点（`A` 到 `E`）。顶点之间的所有连接都使用逐点分布进行管道化处理，作业的 `parallelism.default` 设置为 `2`。

```
A → B → C → D → E
```

在本示例中，总共有 10 个任务在运行。第一个管道（`a1` 到 `e1`）在 TaskManager（`TM1`）上运行，第二个管道（`a2` 到 `e2`）在另一个 TaskManager（`TM2`）上运行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有两个管道连接的组件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 其中一个失败，则故障仅影响 TaskManager 正在其中运行的管道中的 5 个任务。重启策略仅会启动受影响的管道化组件。

精细恢复仅适用于完全并行的 Flink 作业。`keyBy()` 或 `redistribute()` 操作不支持。有关更多信息，请参阅 *Flink 改进提案* Jira 项目中的 [FLIP-1：从任务失败中进行精细恢复](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

要启用精细恢复，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 自适应计划程序中的组合重启机制
<a name="flink-restart-combined"></a>

**注意**  
Amazon EMR 6.15.0 及更高版本支持自适应计划程序中的组合重启机制。

自适应计划程序可以根据可用插槽调整作业的并行度。如果没有足够的插槽来适应配置的作业并行度，它将自动降低并行度。如果有新的插槽可用，则任务将再次纵向扩展到配置的作业并行度。当没有足够的可用资源时，自适应计划程序将避免作业停机。这是 Flink Autoscaler 支持的计划程序。出于这些原因，我们建议在 Amazon EMR Flink 中使用自适应计划程序。但是，自适应计划程序可能会在短时间内进行多次重启，每添加一个新资源就会重启一次。这可能导致作业性能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在自适应计划程序中具有组合重启机制，可在添加第一个资源时打开一个重启窗口，然后等到配置的默认 1 分钟窗口间隔时。当有足够的资源可用来运行具有配置并行性的作业时，或者当间隔超时时，它会执行一次重启。

对于示例作业，我们的基准测试表明，当您使用自适应计划程序和 Flink Autoscaler 时，此功能处理的记录比默认行为多 10%。

要启用组合重启机制，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```