

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 優化任務復原和擴展操作的作業重新啟動時間
<a name="flink-restart"></a>

當任務失敗或發生擴展操作時，Flink 會嘗試從最後一個完成的檢查點重新執行任務。根據檢查點狀態的大小和平行任務數量，重新啟動程序可能需要一分鐘或更長的時間才能執行。在重新啟動期間，作業的積壓任務可能會累積。不過，Flink 有一些方法可以優化執行圖表的復原和重新啟動速度，以提高作業穩定性。

本頁說明 Amazon EMR Flink 在任務復原或擴展操作期間縮短作業重新啟動時間的一些方法。

**Topics**
+ [任務本機復原](#flink-restart-task-local)
+ [一般日誌型增量檢查點](#flink-restart-log-check)
+ [精細復原](#flink-restart-fine-grained)
+ [調整式排程器中的合併重新啟動機制](#flink-restart-combined)

## 任務本機復原
<a name="flink-restart-task-local"></a>

**注意**  
Amazon EMR 6.0.0 及更高版本支援任務本機復原。

透過 Flink 檢查點，每個任務皆會產生其狀態的快照，Flink 會將其寫入分散式儲存體 (如 Amazon S3)。在復原的情況下，任務會從分散式儲存體還原其狀態。分散式儲存提供容錯能力，並且由於所有節點皆可存取分散式儲存體，因此可以在重新擴展期間重新分配狀態。

但是，遠端分散式存放區也有一個缺點：所有任務均須透過網路從遠端位置讀取其狀態。這可能會導致任務復原或擴展操作期間大型狀態的復原時間較長。

您可透過*任務本機復原*來解決復原時間較長的問題。任務會將檢查點上的狀態寫入任務本機的次要儲存體，例如本機磁碟上。同時還會將其狀態存放在主要儲存體中 (在此例中為 Amazon S3)。在復原期間，排程器會在較早執行任務的相同 Task Manager 上排定任務，以便其可以從本機狀態存放區復原，而不是從遠端狀態存放區讀取。如需詳細資訊，請參閱《Apache Flink 文件》**中的[任務本機復原](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我們對範例作業的基準測試指出，啟用任務本機復原後，復原時間已從幾分鐘縮短至幾秒鐘。

若要啟用任務本機復原，請在 `flink-conf.yaml` 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 一般日誌型增量檢查點
<a name="flink-restart-log-check"></a>

**注意**  
Amazon EMR 6.10.0 及更高版本支援一般日誌型增量檢查點。

Flink 1.16 中新增了一般日誌型增量檢查點，以提高檢查點的速度。較快的檢查點間隔通常可減少復原工作，因為復原後需要重新處理的事件較少。如需詳細資訊，請參閱 *Apache Flink 部落格*上的 [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

我們對範例作業的基準測試指出，使用一般日誌型增量檢查點時，檢查點時間已從幾分鐘縮短至幾秒鐘。

若要啟用一般日誌型增量檢查點，請在 `flink-conf.yaml` 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精細復原
<a name="flink-restart-fine-grained"></a>

**注意**  
Amazon EMR 6.0.0 及更高版本提供對預設排程器的精細復原支援。Amazon EMR 6.15.0 及更高版本提供調整式排程器中的精細復原支援。

當任務在執行過程中失敗時，Flink 會重設整個執行圖表，並從最後一個完成的檢查點觸發完整的重新執行。相較於僅重新執行失敗的任務，此方式的成本更高。精細復原僅會重新啟動失敗任務的管道連接元件。在下列範例中，作業圖表有 5 個頂點 (`A` 至 `E`)。頂點之間的所有連接均採用逐點分佈的管道方式，且作業的 `parallelism.default` 設定為 `2`。

```
A → B → C → D → E
```

在此範例中，總共有 10 個任務正在執行。第一個管道 (`a1` 至 `e1`) 在 TaskManager (`TM1`) 上執行，而第二個管道 (`a2` 至 `e2`) 在另一個 TaskManager (`TM2`) 上執行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有兩個管道連接的元件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 失敗，則失敗僅會影響正在執行 TaskManager 之管道中的 5 個任務。重新啟動策略只會啟動受影響的管道元件。

精細復原僅適用於完全平行的 Flink 作業。`keyBy()` 或 `redistribute()` 操作不支援。如需詳細資訊，請參閱 *Flink Improvement Proposal* Jira 專案中的 [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

若要啟用精細復原，請在 `flink-conf.yaml` 檔案中設定下列組態。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 調整式排程器中的合併重新啟動機制
<a name="flink-restart-combined"></a>

**注意**  
Amazon EMR 6.15.0 及更高版本支援調整式排程器中的合併重新啟動機制。

調整式排程器可根據可用的插槽調整作業的並行度。如果沒有足夠的插槽以符合設定的作業並行度，則會自動降低並行度。如果有新的插槽可用，作業就會再次縱向擴展至設定的作業並行度。在沒有足夠的可用資源時，調整式排程器可以避免作業停機。這是 Flink Autoscaler 支援的排程器。基於這些原因，我們建議使用 Amazon EMR Flink 搭配調整式排程器。但是，調整式排程器可能會在短時間內進行多次重新啟動，每新增一個新資源就會重新啟動一次。這可能會導致作業效能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在調整式排程器中具有合併重新啟動機制，該機制會在新增第一個資源時開啟重新啟動時段，然後等到設定的預設 1 分鐘時段間隔為止。當有足夠的資源可使用設定的並行度執行作業時，或當間隔逾時，其會執行單次重新啟動。

我們對範例作業的基準測試指出，當您使用調整式排程器和 Flink 自動擴展器時，此功能比預設行為多處理 10% 的記錄。

若要啟用合併重新啟動機制，請在 `flink-conf.yaml` 檔案中設定下列組態。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```