

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Flink 如何支援高可用性和任務彈性
<a name="jobruns-flink-resiliency"></a>

下列各節概述 Flink 如何讓任務更可靠且高可用性。它透過 Flink 高可用性等內建功能執行此操作，並在發生故障時提供各種復原功能。

**Topics**
+ [使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)](jobruns-flink-using-ha.md)
+ [使用 Amazon EMR on EKS 優化 Flink 作業重新啟動時間，以進行任務復原和擴展操作](jobruns-flink-restart.md)
+ [使用 Amazon EMR on EKS 上的 Flink 正常解除委任 Spot 執行個體](jobruns-flink-decommission.md)

# 使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)
<a name="jobruns-flink-using-ha"></a>

本主題說明如何設定高可用性，並說明它在幾個不同的使用案例中的運作方式。這包括當您使用 任務管理員時，以及當您使用 Flink 原生 kubernetes 時。

## Flink Operator 高可用性
<a name="jobruns-flink-ha-operator"></a>

我們啟用 Flink Operator 的*高可用性*，以便可以容錯移轉至待命 Flink Operator，在發生故障時將 Operator 控制迴圈中的停機時間降至最低。依預設會啟用「高可用性」，且起始 Operator 複本的預設數目為 2。可以在 `values.yaml` 檔案中設定 Helm Chart 的複本欄位。

下列欄位可自訂：
+ `replicas` (選用，預設值為 2)：將此數字設定為大於 1 可建立其他待命 Operator，並允許更快速地復原作業。
+ `highAvailabilityEnabled` (選用，預設值為 true)：控制是否要啟用 HA。將此參數指定為 true 可啟用多可用區部署支援，並設定正確的 `flink-conf.yaml` 參數。

透過在 `values.yaml` 檔案中設定下列組態，停用 operator 的高可用性。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**多可用區部署**

我們會在多個可用區域建立 operator Pod。這是一個軟約束，如果您在不同的可用區域中沒有足夠的資源，將在相同的可用區域中排程您的 operator Pod 。

**確定領導者複本**

 如果啟用 HA，則複本使用 Lease 來確定哪些 JM 是領導者，並使用 K8s Lease 進行領導者選舉。您可以描述 Lease 並查看 .Spec.Holder Identity 欄位，以確定目前的領導者

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 互動**

**設定存取憑證**

請確定您已設定 IRSA，具有可存取 S3 儲存貯體的適當 IAM 許可。

**從 S3 應用程式模式中擷取作業 jar**

Flink Operator 也支援從 S3 中擷取應用程式 jar。只需在 FlinkDeployment 規格中提供 jarURI 的 S3 位置即可。

也可以使用此功能來下載其他成品，例如 PyFLink 指令碼。生成的 Python 指令碼放在路徑 `/opt/flink/usrlib/` 下。

以下範例示範如何將此功能用於 PyFLink 作業。請注意 jarURI 和引數欄位。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 連接器**

Flink 隨附有兩個 S3 連接器 (如下所列)。以下各章節討論何時使用哪個連接器。

**檢查點：Presto S3 連接器**
+ 將 S3 結構描述設為 s3p://
+ 用於檢查點到 s3 的建議連接器。如需詳細資訊，請參閱 Apache Flink 文件中的 [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)。

FlinkDeployment 規格範例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**讀取和寫入 S3：Hadoop S3 連接器**
+ 將 S3 結構描述設定為 `s3://` 或 (`s3a://`)
+ 用於從 S3 讀取和寫入檔案的建議連接器 (只有 S3 連接器實作 [Flinks Filesystem 介面](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/))。
+ 根據預設，我們在 `flink-conf.yaml` 檔案`fs.s3a.aws.credentials.provider`中設定 ，也就是 `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`。如果完全覆寫預設 `flink-conf` 並且正在與 S3 進行互動，請確保使用此提供程式。

FlinkDeployment 規格範例

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Flink 部署的高可用性 (HA) 可讓作業繼續進行，即使遇到暫時性錯誤以及 JobManager 當機。作業將會重新啟動，但會從上次啟用 HA 的成功檢查點開始。如果沒有啟用 HA，Kubernetes 將重新啟動 JobManager，但您的作業將作為新作業開始，並將失去其進度。設定 HA 之後，我們可以告訴 Kubernetes 將 HA 中繼資料儲存在永久性儲存體中，以便在 JobManager 中發生暫時性失敗時進行參考，然後從上次成功的檢查點恢復我們的作業。

Flink 作業預設為啟用 HA (複本計數設為 2，這將要求您提供 S3 儲存位置，以便 HA 中繼資料持續存在)。

**HA 組態**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下是 Job Manager 中上述 HA 組態的描述 (在 .spec.jobManager 下定義)：
+ `highAvailabilityEnabled` (選用，預設值為 true)：如果您不想啟用 HA 且不想使用提供的 HA 組態，請將此設定為 `false `。您仍然可以操作「複本」欄位以手動設定 HA。
+ `replicas` (選用，預設值為 2)：將此數字設定為大於 1 可建立其他待命 JobManager，並允許更快速地復原作業。如果停用 HA，則必須將複本計數設定為 1，否則您將繼續收到驗證錯誤 (如果未啟用 HA，則僅支援 1 個複本)。
+ `storageDir` (必填)：因為我們預設使用的複本計數為 2，所以我們必須提供一個持久的 StorageDir。目前，此欄位僅接受 S3 路徑作為儲存位置。

**Pod 位置**

 如果您啟用 HA，我們也會嘗試在同一個可用區域中共置 Pod，進而提升效能 (藉由將 Pod 放在相同可用區域來減少網路延遲)。這是一個竭盡全力的過程，意味著如果您在對大部分 Pod 進行排程的可用區域中沒有足夠的資源，剩餘的 Pod 仍會排程，但最終可能會在此可用區域以外的節點上結束。

**確定領導者複本**

如果啟用 HA，複本會使用租用來判斷哪個 JM 是領導者，並使用 K8s Configmap 作為儲存此中繼資料的資料儲存。如果您想確定領導者，可以查看 Configmap 的內容，然後查看資料下的關鍵字 `org.apache.flink.k8s.leader.restserver`，以使用該 IP 地址尋找 K8s Pod。也可使用下列 bash 命令。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 作業：原生 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本支援 Flink Native Kubernetes，以便在 Amazon EKS 叢集上以高可用性模式執行 Flink 應用程式。

**注意**  
提交 Flink 作業時，必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能，可以停用它。依預設會啟用此功能。

若要開啟 Flink 高可用性功能，請在[執行 `run-application` CLI 命令](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)時提供下列 Flink 參數。參數定義於範例下方。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**：您要在其中存放作業的高可用性中繼資料的 Amazon S3 儲存貯體。

  **`Dkubernetes.jobmanager.replicas`**：要建立的 Job Manager Pod 數量，為大於 `1` 的整數。

  **`Dkubernetes.cluster-id`**：識別 Flink 叢集的唯一識別碼。

# 使用 Amazon EMR on EKS 優化 Flink 作業重新啟動時間，以進行任務復原和擴展操作
<a name="jobruns-flink-restart"></a>

當任務失敗或發生擴展操作時，Flink 會嘗試從最後一個完成的檢查點重新執行任務。根據檢查點狀態的大小和平行任務數量，重新啟動程序可能需要一分鐘或更長的時間才能執行。在重新啟動期間，作業的積壓任務可能會累積。不過，Flink 有一些方法可以優化執行圖表的復原和重新啟動速度，以提高作業穩定性。

此頁面說明 Amazon EMR Flink 在 Spot 執行個體上的任務復原或擴展操作期間，可以改善任務重新啟動時間的一些方式。Spot 執行個體是未使用的運算容量，以折扣價提供。它具有獨特的行為，包括偶爾中斷，因此請務必了解 Amazon EMR on EKS 如何處理這些行為，包括 Amazon EMR on EKS 如何執行除役和重新啟動任務。

**Topics**
+ [任務本機復原](#flink-restart-task-local)
+ [透過 Amazon EBS 磁碟區掛載進行任務本機復原](#flink-restart-task-local-ebs)
+ [一般日誌型增量檢查點](#flink-restart-log-check)
+ [精細復原](#flink-restart-fine-grained)
+ [調整式排程器中的合併重新啟動機制](#flink-restart-combined)

## 任務本機復原
<a name="flink-restart-task-local"></a>

**注意**  
Amazon EMR on EKS 6.14.0 及更高版本上的 Flink 支援任務本機復原。

透過 Flink 檢查點，每個任務皆會產生其狀態的快照，Flink 會將其寫入分散式儲存體 (如 Amazon S3)。在復原的情況下，任務會從分散式儲存體還原其狀態。分散式儲存提供容錯能力，並且由於所有節點皆可存取分散式儲存體，因此可以在重新擴展期間重新分配狀態。

但是，遠端分散式存放區也有一個缺點：所有任務均須透過網路從遠端位置讀取其狀態。這可能會導致任務復原或擴展操作期間大型狀態的復原時間較長。

您可透過*任務本機復原*來解決復原時間較長的問題。任務會將檢查點上的狀態寫入任務本機的次要儲存體，例如本機磁碟上。同時還會將其狀態存放在主要儲存體中 (在此例中為 Amazon S3)。在復原期間，排程器會在較早執行任務的相同 Task Manager 上排定任務，以便其可以從本機狀態存放區復原，而不是從遠端狀態存放區讀取。如需詳細資訊，請參閱《Apache Flink 文件》**中的[任務本機復原](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我們對範例作業的基準測試指出，啟用任務本機復原後，復原時間已從幾分鐘縮短至幾秒鐘。

若要啟用任務本機復原，請在 `flink-conf.yaml` 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## 透過 Amazon EBS 磁碟區掛載進行任務本機復原
<a name="flink-restart-task-local-ebs"></a>

**注意**  
Amazon EMR on EKS 6.15.0 及更高版本上的 Flink 支援 Amazon EBS 的任務本機復原。

透過 Amazon EMR on EKS 上的 Flink，您可以自動向 TaskManager Pod 佈建 Amazon EBS 磁碟區以進行任務本機復原。預設的覆蓋掛載隨附 10 GB 磁碟區，對於狀態較低的作業而言即足夠。具有大型狀態的作業可以啟用*自動 EBS 磁碟區掛載*選項。TaskManager Pod 會在 Pod 建立期間自動建立和掛載，並在 Pod 刪除期間移除。

使用下列步驟為 Amazon EMR on EKS 中的 Flink 啟用自動 EBS 磁碟區掛載：

1. 匯出您將在後續步驟中使用的下列變數值。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 為您的叢集建立或更新 `kubeconfig` YAML 檔案。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. 為 Amazon EKS 叢集上的 Amazon EBS 容器儲存介面 (CSI) 驅動程式建立 IAM 服務帳戶。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 使用下列命令建立 Amazon EBS CSI 驅動程式：

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 使用下列命令建立 Amazon EBS 儲存類別：

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   然後套用該類別：

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm 安裝 Amazon EMR Flink Kubernetes Operator，並提供建立服務帳戶的選項。這會建立要在 Flink 部署中使用的 `emr-containers-sa-flink`。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. 若要提交 Flink 作業並啟用 EBS 磁碟區的自動佈建以進行任務本機復原，請在 `flink-conf.yaml` 檔案中設定下列組態。調整作業狀態大小的大小限制。將 `serviceAccount` 設定為 `emr-containers-sa-flink`。指定檢查點間隔值 (以毫秒為單位)。並省略 `executionRoleArn`。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

當您準備好刪除 Amazon EBS CSI 驅動程式外掛程式時，請使用下列命令：

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 一般日誌型增量檢查點
<a name="flink-restart-log-check"></a>

**注意**  
Amazon EMR on EKS 6.14.0 及更高版本上的 Flink 支援一般日誌型增量檢查點。

Flink 1.16 中新增了一般日誌型增量檢查點，以提高檢查點的速度。較快的檢查點間隔通常可減少復原工作，因為復原後需要重新處理的事件較少。如需詳細資訊，請參閱 *Apache Flink 部落格*上的 [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

我們對範例作業的基準測試指出，使用一般日誌型增量檢查點時，檢查點時間已從幾分鐘縮短至幾秒鐘。

若要啟用一般日誌型增量檢查點，請在 `flink-conf.yaml` 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精細復原
<a name="flink-restart-fine-grained"></a>

**注意**  
Amazon EMR on EKS 6.14.0 及更高版本上的 Flink 提供對預設排程器的精細復原支援。Amazon EMR on EKS 6.15.0 及更高版本上的 Flink 提供調整式排程器中的精細復原支援。

當任務在執行過程中失敗時，Flink 會重設整個執行圖表，並從最後一個完成的檢查點觸發完整的重新執行。相較於僅重新執行失敗的任務，此方式的成本更高。精細復原僅會重新啟動失敗任務的管道連接元件。在下列範例中，作業圖表有 5 個頂點 (`A` 至 `E`)。頂點之間的所有連接均採用逐點分佈的管道方式，且作業的 `parallelism.default` 設定為 `2`。

```
A → B → C → D → E
```

在此範例中，總共有 10 個任務正在執行。第一個管道 (`a1` 至 `e1`) 在 TaskManager (`TM1`) 上執行，而第二個管道 (`a2` 至 `e2`) 在另一個 TaskManager (`TM2`) 上執行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有兩個管道連接的元件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 失敗，則失敗僅會影響正在執行 TaskManager 之管道中的 5 個任務。重新啟動策略只會啟動受影響的管道元件。

精細復原僅適用於完全平行的 Flink 作業。`keyBy()` 或 `redistribute()` 操作不支援。如需詳細資訊，請參閱 *Flink Improvement Proposal* Jira 專案中的 [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

若要啟用精細復原，請在 `flink-conf.yaml` 檔案中設定下列組態。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 調整式排程器中的合併重新啟動機制
<a name="flink-restart-combined"></a>

**注意**  
Amazon EMR on EKS 6.15.0 及更高版本上的 Flink 支援調整式排程器中的合併重新啟動機制。

調整式排程器可根據可用的插槽調整作業的並行度。如果沒有足夠的插槽以符合設定的作業並行度，則會自動降低並行度。如果有新的插槽可用，作業就會再次縱向擴展至設定的作業並行度。在沒有足夠的可用資源時，調整式排程器可以避免作業停機。這是 Flink Autoscaler 支援的排程器。基於這些原因，我們建議使用 Amazon EMR Flink 搭配調整式排程器。但是，調整式排程器可能會在短時間內進行多次重新啟動，每新增一個新資源就會重新啟動一次。這可能會導致作業效能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在調整式排程器中具有合併重新啟動機制，該機制會在新增第一個資源時開啟重新啟動時段，然後等到設定的預設 1 分鐘時段間隔為止。當有足夠的資源可使用設定的並行度執行作業時，或當間隔逾時，其會執行單次重新啟動。

我們對範例作業的基準測試指出，當您使用調整式排程器和 Flink 自動擴展器時，此功能比預設行為多處理 10% 的記錄。

若要啟用合併重新啟動機制，請在 `flink-conf.yaml` 檔案中設定下列組態。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# 使用 Amazon EMR on EKS 上的 Flink 正常解除委任 Spot 執行個體
<a name="jobruns-flink-decommission"></a>

Flink 搭配 Amazon EMR on EKS 可以改善任務復原或擴展操作期間的作業重新啟動時間。

## 概觀
<a name="jobruns-flink-decommission-overview"></a>

Amazon EMR on EKS 版本 6.15.0 及更高版本支援使用 Apache Flink 在 Amazon EMR on EKS 中的 Spot 執行個體上正常解除委任 Task Manager。作為此功能的一部分，Amazon EMR on EKS 與 Flink 提供以下功能：
+ **即時檢查點**：Flink 串流作業可回應 Spot 執行個體中斷、對正在執行的作業執行即時 (JIT) 檢查點，並防止在這些 Spot 執行個體上排定其他任務。預設和調整式排程器支援 JIT 檢查點。
+ **合併重新啟動機制**：合併重新啟動機制會在作業達到目標資源並行度或目前設定的時段結束時，盡力嘗試重新啟動作業。這也可以防止因多個 Spot 執行個體終止而可能導致的連續作業重新啟動。組合重新啟動機制僅適用於調整式排程器。

這些功能具有以下優點：
+ 您可以利用 Spot 執行個體來執行 Task Manager 並減少叢集支出。
+ 改善 Spot 執行個體 Task Manager 的活性，可提高彈性並實現更有效率的作業排程。
+ 您的 Flink 作業將具有更長的正常執行時間，因為 Spot 執行個體終止後的重新啟動次數將減少。

## 正常解除委任的運作方式
<a name="jobruns-flink-decommission-howitworks"></a>

請考量下列範例：您佈建一個執行 Apache Flink 的 Amazon EMR on EKS 叢集，並為 Job Manager 指定隨需節點，以及為 Task Manager 指定 Spot 執行個體節點。終止前兩分鐘，Task Manager 收到中斷通知。

在此案例中，Job Manager 會處理 Spot 執行個體中斷訊號、封鎖 Spot 執行個體上其他任務的排程，以及為串流作業啟動 JIT 檢查點。

然後，只有在目前的重新啟動間隔時段中有足夠的新資源可用性以滿足目前的作業並行度之後，Job Manager 才會重新啟動作業圖表。重新啟動時段間隔是根據 Spot 執行個體更換持續時間、新 Job Manager Pod 的建立，以及向 Job Manager 註冊來決定。

## 先決條件
<a name="jobruns-flink-decommission-prereqs"></a>

若要使用正常的解除編譯，請在執行 Apache Flink 的 Amazon EMR on EKS 叢集上建立並執行串流任務。啟用在至少一個 Spot 執行個體上排定的調整式排程器和 Task Manager，如以下範例所示。您應針對 Job Manager 使用隨選節點，並且只要至少有一個 Spot 執行個體，就可以將隨選節點用於 Task Manager。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuration
<a name="jobruns-flink-decommission-config"></a>

本節涵蓋了您可以根據解除委任需求指定的大部分組態。


| 金錀 | Description | 預設值 | 可接受值 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  啟用 Task Manager 的正常解除委任。  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  在調整式排程器中啟用合併重新啟動機制。  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  為作業執行合併重新啟動的合併重新啟動時段間隔。未顯示單位的整數即視為以毫秒為單位。  |  1m  |  範例：30、60s、3m、1h | 