

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon EMR on EKS 執行 Flink 作業
<a name="run-flink-jobs"></a>

Amazon EMR 6.13.0 版及更高版本支援具有 Apache Flink 的 Amazon EMR on EKS 或 Flink Kubernetes Operator，作為 Amazon EMR on EKS 的作業提交模型。使用具有 Apache Flink 的 Amazon EMR on EKS，您可以在自己的 Amazon EKS 叢集上使用 Amazon EMR 發行執行期來部署和管理 Flink 應用程式。在 Amazon EKS 叢集中部署 Flink Kubernetes Operator 之後，即可直接向 Operator 提交 Flink 應用程式。Operator 可管理 Flink 應用程式的生命週期。

**Topics**
+ [設定和使用 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator.md)
+ [使用 Flink 原生 Kubernetes](jobruns-flink-native-kubernetes.md)
+ [自訂 Flink 和 FluentD 的 Docker 映像](jobruns-flink-docker-flink-fluentd.md)
+ [監控 Flink Kubernetes Operator 和 Flink 作業](jobruns-flink-monitoring.md)
+ [Flink 如何支援高可用性和任務彈性](jobruns-flink-resiliency.md)
+ [針對 Flink 應用程式使用 Autoscaler](jobruns-flink-autoscaler.md)
+ [Amazon EMR on EKS 上 Flink 任務的維護和疑難排解](jobruns-flink-troubleshooting.md)
+ [具有 Apache Flink 的 Amazon EMR on EKS 的支援版本](jobruns-flink-security-release-versions.md)

# 設定和使用 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator"></a>

以下頁面描述了如何設定和使用 Flink Kubernetes Operator，以使用 Amazon EMR on EKS 執行 Flink 作業。可用的主題包括必要的先決條件、如何設定您的環境，以及在 Amazon EMR on EKS 上執行 Flink 應用程式。

**Topics**
+ [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md)
+ [安裝適用於 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-getting-started.md)
+ [執行 Flink 應用程式](jobruns-flink-kubernetes-operator-run-application.md)
+ [執行 Flink 應用程式的安全角色許可](jobruns-flink-kubernetes-security.md)
+ [針對 Amazon EMR on EKS 解除安裝 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-uninstall.md)

# 針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-setup"></a>

先完成下列任務，然後在 Amazon EKS 上安裝 Flink Kubernetes Operator。如果已經註冊 Amazon Web Services (AWS) 且已在使用 Amazon EKS，則您幾乎可使用 Amazon EMR on EKS。完成下列任務，即可在 Amazon EKS 上設定 Flink Operator。如果已經完成任何先決條件，則可以跳過這些先決條件，然後繼續進行下一個。
+ **[安裝或更新至最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)**- 如果您已安裝 AWS CLI，請確認您擁有最新版本。
+ **[設定 kubectl 和 eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** – eksctl 是您用來與 Amazon EKS 通訊的命令列工具。
+ **[安裝 Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** – Kubernetes 的 Helm 套件管理工具可協助您安裝和管理 Kubernetes 叢集上的應用程式。
+ **[開始使用 Amazon EKS – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** – 請依照步驟在 Amazon EKS 中建立具有節點的新 Kubernetes 叢集。
+ **[選擇 Amazon EMR 發行標籤](jobruns-flink-security-release-versions.md) (6.13.0 版或更新版本）** – Amazon EMR 6.13.0 版及更高版本支援 Flink Kubernetes Operator。
+ **[在 Amazon EKS 叢集上啟用服務帳戶的 IAM 角色 (IRSA)](setting-up-enable-IAM.md)**。
+ **[建立作業執行角色](creating-job-execution-role.md)**。
+ **[更新作業執行角色的信任政策 ](setting-up-trust-policy.md)**。
+ 建立操作員執行角色。此為選擇性步驟。可以對 Flink 作業和操作員使用相同的角色。如果想要為操作員使用不同的 IAM 角色，可以建立單獨的角色。
+ 更新操作員執行角色的信任政策。必須針對想要用於 Amazon EMR Flink Kubernetes 操作員服務帳戶的角色明確新增一個信任政策項目。可以遵循以下範例格式：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# 安裝適用於 Amazon EMR on EKS 的 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

本主題透過準備 Flink 部署，協助您開始在 Amazon EKS 上使用 Flink Kubernetes Operator。

## 安裝 Kubernetes 運算子
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

請使用下列步驟來安裝 Kubernetes Operator for Apache Flink。

1. 如果您尚未這麼做，請完成 [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 中的步驟。

1. 安裝 *cert-manager* (每個 Amazon EKS 叢集一次) 以允許新增 Webhook 元件。

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. 安裝 Helm Chart。

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   輸出範例：

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. 等待部署完成，然後驗證 Chart 安裝。

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. 部署完成時，您應該會看到下列訊息。

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. 使用以下命令查看已部署的 Operator。

   ```
   helm list --namespace $NAMESPACE
   ```

   以下顯示範例輸出，其中應用程式版本 `x.y.z-amzn-n` 與 Amazon EMR on EKS 版本的 Flink Operator 版本相對應。如需詳細資訊，請參閱[具有 Apache Flink 的 Amazon EMR on EKS 的支援版本](jobruns-flink-security-release-versions.md)。

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### 升級 Kubernetes 運算子
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

若要升級 Flink 運算子的版本，請遵循下列步驟：

1. 解除安裝舊的 `flink-kubernetes-operator`：`helm uninstall flink-kubernetes-operator -n <NAMESPACE>`。

1. 刪除 CRD （因為 helm 不會自動刪除舊 CRD)：`kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`。

1. `flink-kubernetes-operator` 使用較新版本重新安裝 。

# 執行 Flink 應用程式
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

使用 Amazon EMR 6.13.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。使用 Amazon EMR 6.15.0 及更高版本時，您也可以在工作階段模式下執行 Flink 應用程式。本頁面介紹了可用於透過 Amazon EMR on EKS 執行 Flink 應用程式的兩種方法。

**Topics**

**注意**  
提交 Flink 作業時，必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能，可以停用它。依預設會啟用此功能。

**必要條件**：在使用 Flink Kubernetes Operator 執行 Flink 應用程式之前，請完成 [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 和 [安裝 Kubernetes 運算子](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) 中的步驟。

------
#### [ Application mode ]

使用 Amazon EMR 6.13.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。

1. 建立`FlinkDeployment`定義檔案，`basic-example-app-cluster.yaml`如下列範例所示。如果您啟用並使用其中一個[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)，請務必取消註解並設定組態 `fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用下列命令提交 Flink 部署。這也將建立名為 `basic-example-app-cluster` 的 `FlinkDeployment` 物件。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. 存取 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. 開啟 `localhost:8081` 以在本機檢視 Flink 作業。

1. 清除作業。請記得清除為此作業建立的 S3 成品，例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。

如需有關透過 Flink Kubernetes Operator 提交應用程式至 Flink 的詳細資訊，請參閱 GitHub 上 `apache/flink-kubernetes-operator` 資料夾中的 [Flink Kubernetes Operator 範例](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)。

------
#### [ Session mode ]

使用 Amazon EMR 6.15.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在工作階段模式下執行 Flink 應用程式。

1. 在下列範例中建立名為 `basic-example-app-cluster.yaml` `FlinkDeployment`的定義檔案。如果您啟用並使用其中一個[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)，請務必取消註解並設定組態 `fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用下列命令提交 Flink 部署。這也將建立名為 `basic-example-session-cluster` 的 `FlinkDeployment` 物件。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 使用下列命令確認工作階段叢集 `LIFECYCLE` 為 `STABLE`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   輸出應類似以下範例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 使用以下範例內容，建立 `FlinkSessionJob` 自訂定義資源檔案 `basic-session-job.yaml`：

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 使用下列命令提交 Flink 工作階段作業。這將建立 `FlinkSessionJob` 物件 `basic-session-job`。

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 使用下列命令確認工作階段叢集 `LIFECYCLE` 為 `STABLE`，且 `JOB STATUS` 為 `RUNNING`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   輸出應類似以下範例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. 存取 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. 開啟 `localhost:8081` 以在本機檢視 Flink 作業。

1. 清除作業。請記得清除為此作業建立的 S3 成品，例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。

------

# 執行 Flink 應用程式的安全角色許可
<a name="jobruns-flink-kubernetes-security"></a>

本主題說明部署和執行 Flink 應用程式的安全角色。管理部署以及建立和管理任務、操作員角色和任務角色需要兩個角色。本主題會介紹他們並列出他們的許可。

## 角色型存取控制
<a name="jobruns-flink-kubernetes-security-rbac"></a>

若要部署 Operator 並執行 Flink 作業，必須建立兩個 Kubernetes 角色：一個 Operator 和一個作業角色。Amazon EMR 會在您安裝 Operator 時預設建立兩個角色。

## Operator 角色
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

我們使用 Operator 角色來管理 `flinkdeployments`，以便為每個 Flink 作業和其他資源 (如服務) 建立和管理 JobManager。

Operator 角色的預設名稱為 `emr-containers-sa-flink-operator` 且需要下列許可。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## 作業角色
<a name="jobruns-flink-security-job-role"></a>

JobManager 使用作業角色來建立和管理每個作業的 TaskManagers 和 ConfigMaps。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# 針對 Amazon EMR on EKS 解除安裝 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

請依照下列步驟解除安裝 Flink Kubernetes Operator。

1. 刪除 Operator。

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. 刪除 Helm 不會解除安裝的 Kubernetes 資源。

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (選用) 刪除 cert-manager。

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# 使用 Flink 原生 Kubernetes
<a name="jobruns-flink-native-kubernetes"></a>

Amazon EMR 6.13.0 版及更高版本支援 Flink Native Kubernetes 作為命令列工具，可以使用它向 Amazon EMR on EKS 叢集提交 Flink 應用程式並執行。

**Topics**
+ [針對 Amazon EMR on EKS 設定 Flink Native Kubernetes](jobruns-flink-native-kubernetes-setup.md)
+ [開始針對 Amazon EMR on EKS 使用 Flink Native Kubernetes](jobruns-flink-native-kubernetes-getting-started.md)
+ [Native Kubernetes 的 Flink JobManager 服務帳戶安全要求](jobruns-flink-native-kubernetes-security-requirements.md)

# 針對 Amazon EMR on EKS 設定 Flink Native Kubernetes
<a name="jobruns-flink-native-kubernetes-setup"></a>

完成下列任務進行設定，然後才能在 Amazon EMR on EKS 上使用 Flink CLI 執行應用程式。如果已經註冊 Amazon Web Services (AWS) 且已在使用 Amazon EKS，則您幾乎可使用 Amazon EMR on EKS。如果已經完成任何先決條件，則可以跳過這些先決條件，然後繼續進行下一個。
+ **[安裝或更新至最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)**- 如果您已安裝 AWS CLI，請確認您擁有最新版本。
+ **[開始使用 Amazon EKS – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** – 請依照步驟在 Amazon EKS 中建立具有節點的新 Kubernetes 叢集。
+ **[選擇 Amazon EMR 基礎映像 URI](docker-custom-images-tag.md) (6.13.0 版或更高版本)** - Amazon EMR 6.13.0 版及更高版本支援 Flink Kubernetes 命令。
+ 確認 JobManager 服務帳戶具有建立和監控 TaskManager Pod 的適當許可。如需詳細資訊，請參閱[原生 Kubernetes 的 Flink JobManager 服務帳戶安全需求](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html)。
+ 設定本機 [AWS 憑證設定檔](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)。
+ 對於您要在其中執行 Flink 應用程式的 [Amazon EKS 叢集建立或更新 Kubeconfig 檔案](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html)。

# 開始針對 Amazon EMR on EKS 使用 Flink Native Kubernetes
<a name="jobruns-flink-native-kubernetes-getting-started"></a>

這些步驟說明如何設定、設定服務帳戶，以及執行 Flink 應用程式。Flink 原生 Kubernetes 用於在執行中的 Kubernetes 叢集上部署 Flink。

## 設定和執行 Flink 應用程式
<a name="jobruns-flink-native-kubernetes-getting-started-run-application"></a>

Amazon EMR 6.13.0 及更高版本支援 Flink Native Kubernetes，以便在 Amazon EKS 叢集上執行 Flink 應用程式。完成以下步驟，以執行 Flink 應用程式：

1. 在使用 Flink Native Kubernetes 命令執行 Flink 應用程式之前，請先完成 [針對 Amazon EMR on EKS 設定 Flink Native Kubernetes](jobruns-flink-native-kubernetes-setup.md) 中的步驟。

1. [下載並安裝 Flink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation)。

1. 設定以下環境變數的值。

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-AWS 區域-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. 建立服務帳戶，以管理 Kubernetes 資源。

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. 執行 `run-application` CLI 命令 。

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. 檢查已建立的 Kubernetes 資源。

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. 連接埠轉送到 8081。

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. 在本機存取 Flink 使用者介面。  
![\[存取 Flink UI。\]](http://docs.aws.amazon.com/zh_tw/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. 刪除 Flink 應用程式。

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

如需有關提交應用程式至 Flink 的詳細資訊，請參閱 Apache Flink 文件中的 [Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/)。

# Native Kubernetes 的 Flink JobManager 服務帳戶安全要求
<a name="jobruns-flink-native-kubernetes-security-requirements"></a>

Flink JobManager Pod 使用 Kubernetes 服務帳戶來存取 Kubernetes API 伺服器，以建立和監控 TaskManager Pod。JobManager 服務帳戶必須具有適當的許可，才能建立/刪除 TaskManager Pod，並允許 TaskManager 監看領導者 ConfigMaps，以擷取叢集中 JobManager 和 ResourceManager 的地址。

下列規則適用於此服務帳戶。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# 自訂 Flink 和 FluentD 的 Docker 映像
<a name="jobruns-flink-docker-flink-fluentd"></a>

執行下列步驟，使用 Apache Flink 或 FluentD 映像自訂 Amazon EMR on EKS 的 Docker 映像。其中包括取得基礎映像、自訂映像、發佈映像和提交工作負載的技術指導。

**Topics**
+ [先決條件](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [步驟 1：從 Amazon Elastic Container Registry 擷取基礎映像](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [步驟 2：自訂基礎映像](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [步驟 3：發佈您的自訂映像](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [步驟 4：使用自訂映像在 Amazon EMR 中提交 Flink 工作負載](#jobruns-flink-docker-flink-fluentd-submit-workload)

## 先決條件
<a name="jobruns-flink-docker-flink-fluentd-prereqs"></a>

在自訂 Docker 映像之前，請確定您已完成下列先決條件：
+ 完成[設定 Amazon EMR on EKS 的 Flink Kubernetes Operator](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html) 步驟。
+ 已在您的環境中安裝 Docker。如需詳細資訊，請參閱[獲取 Docker](https://docs.docker.com/get-docker/)。

## 步驟 1：從 Amazon Elastic Container Registry 擷取基礎映像
<a name="jobruns-flink-docker-flink-fluentd-retrieve-base"></a>

基礎映像包含您存取其他 所需的 Amazon EMR 執行期和連接器 AWS 服務。如果您使用 Amazon EMR on EKS 搭配 Flink 6.14.0 版或更新版本，您可以從 Amazon ECR Public Gallery 取得基礎映像。瀏覽圖庫以尋找映像連結，然後將映像拉到本地工作區。例如，對於 Amazon EMR 6.14.0 版本，下列`docker pull`命令會傳回最新的標準基礎映像。將 取代`emr-6.14.0:latest`為您想要的發行版本。

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

以下是 Flink 圖庫影像和 Fluentd 圖庫影像的連結：
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fluentd/emr-6.14.0(](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## 步驟 2：自訂基礎映像
<a name="jobruns-flink-docker-flink-fluentd-customize-image"></a>

下列步驟說明如何自訂您從 Amazon ECR 提取的基礎映像。

1. 在您的本機工作區建立新的 `Dockerfile`。

1. 編輯 `Dockerfile`並新增下列內容。這會`Dockerfile`使用您從 提取的容器映像`public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest`。

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   如果您使用的是 ，請使用下列組態`Fluentd`。

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. 在 `Dockerfile` 中新增命令以自訂基礎映像。下列命令示範如何安裝 Python 程式庫。

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. 在您建立 的相同目錄中`DockerFile`，執行下列命令來建置 Docker 映像。您在 `-t`旗標之後提供的 欄位是影像的自訂名稱。

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## 步驟 3：發佈您的自訂映像
<a name="jobruns-flink-docker-flink-fluentd-publish-image"></a>

您現在可以將新的 Docker 映像發佈到您的 Amazon ECR 登錄檔。

1. 執行下列命令來建立 Amazon ECR 儲存庫以存放 Docker 映像。為您的儲存庫提供名稱，例如 `emr_custom_repo.` 如需詳細資訊，請參閱《Amazon Elastic Container Registry 使用者指南》中的[建立儲存](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository)庫。

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. 執行下列命令以驗證預設登錄檔。如需詳細資訊，請參閱《Amazon Elastic Container Registry 使用者指南》中的[驗證您的預設登錄](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry)檔。

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. 推送映像。如需詳細資訊，請參閱《[Amazon Elastic Container Registry 使用者指南》中的將映像推送至 Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image)。

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## 步驟 4：使用自訂映像在 Amazon EMR 中提交 Flink 工作負載
<a name="jobruns-flink-docker-flink-fluentd-submit-workload"></a>

對`FlinkDeployment`規格進行下列變更，以使用自訂映像。若要這樣做，請在部署規格的`spec.image`行中輸入您自己的映像。

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

若要為您的 Fluentd 任務使用自訂映像，請在部署規格的 `monitoringConfiguration.image`行中輸入您自己的映像。

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# 監控 Flink Kubernetes Operator 和 Flink 作業
<a name="jobruns-flink-monitoring"></a>

本章節描述了您可以透過 Amazon EMR on EKS 來監控 Flink 作業的幾種方式。這包括使用 Flink *Web Dashboard* 將 Flink 與 Amazon Managed Service for Prometheus 整合，以提供任務狀態和指標，或使用監控組態將日誌資料傳送至 Amazon S3 和 Amazon CloudWatch。

**Topics**
+ [使用 Amazon Managed Service for Prometheus 監控 Flink 任務](jobruns-flink-monitoring-prometheus.md)
+ [使用 Flink UI 監控 Flink 任務](jobruns-flink-monitoring-ui.md)
+ [使用監控組態來監控 Flink Kubernetes Operator 和 Flink 任務](jobruns-flink-monitoring-configuration.md)

# 使用 Amazon Managed Service for Prometheus 監控 Flink 任務
<a name="jobruns-flink-monitoring-prometheus"></a>

可以整合 Apache Flink 與 Amazon Managed Service for Prometheus (管理入口網站)。Amazon Managed Service for Prometheus 支援從在 Amazon EKS 上執行的叢集的 Amazon Managed Service 中擷取指標。Amazon Managed Service for Prometheus 與 Amazon EKS 叢集上已經執行的 Prometheus 伺服器搭配使用。執行 Amazon Managed Service for Prometheus 與 Amazon EMR Flink Operator 的整合將自動部署和設定 Prometheus 伺服器，以便與 Amazon Managed Service for Prometheus 整合。

1. [ 建立 Amazon Managed Service for Prometheus 工作區](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html)。此工作區用作擷取端點。您稍後將需要遠端寫入 URL。

1. 設定服務帳戶的 IAM 角色。

   對於這種加入方法，請針對執行 Prometheus 伺服器的 Amazon EKS 叢集中的服務帳戶使用 IAM 角色。這些角色也稱為*服務角色*。

   如果您還沒有這些角色，[請設定服務角色，以便從 Amazon EKS 叢集中擷取指標](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html)。

   在繼續之前，請先建立名為 `amp-iamproxy-ingest-role` 的 IAM 角色。

1. 安裝 Amazon EMR Flink Operator 與 Amazon Managed Service for Prometheus。

現在您擁有 Amazon Managed Service for Prometheus 工作區、Amazon Managed Service for Prometheus 的專用 IAM 角色以及必要的許可，可以安裝 Amazon EMR Flink Operator。

建立 `enable-amp.yaml` 檔案。此檔案可讓您使用自訂組態來覆寫 Amazon Managed Service for Prometheus 設定。請務必使用您自己的角色。

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

使用 [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) 命令將覆寫傳遞至 `flink-kubernetes-operator` 圖表。

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

此命令會自動在連接埠 9999 的運算子中安裝 Prometheus 報告程式。任何未來的 `FlinkDeployment` 也會在 9249 上公開 `metrics` 連接埠。
+ Flink Operator 指標會顯示在標籤 `flink_k8soperator_` 下的 Prometheus 中。
+ Flink Task Manager 指標會顯示在標籤 `flink_taskmanager_` 下的 Prometheus 中。
+ Flink Job Manager 指標會顯示在標籤 `flink_jobmanager_` 下的 Prometheus 中。

# 使用 Flink UI 監控 Flink 任務
<a name="jobruns-flink-monitoring-ui"></a>

若要監控執行中 Flink 應用程式的運作狀態和效能，請使用 *Flink Web Dashboard*。此儀表板提供有關作業狀態、TaskManager 數目以及作業指標和日誌的資訊。它也可讓您檢視和修改 Flink 作業的組態，並與 Flink 叢集互動，以提交或取消作業。

若要存取正在 Kubernetes 上執行的 Flink 應用程式的 Flink Web Dashboard，請執行下列動作：

1. 使用 `kubectl port-forward` 命令將本機連接埠轉送至 Flink 應用程式的 TaskManager Pod 中執行 Flink Web Dashboard 的連接埠。此連接埠預設為 8081。將 *deployment-name* 取代為上述 Flink 應用程式部署的名稱。

   ```
   kubectl get deployments -n namespace
   ```

   輸出範例：

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. 如果您想要在本機使用不同的連接埠，請使用 *local-port*:8081 參數。

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. 在網頁瀏覽器中，導覽至 `http://localhost:8081` (或 `http://localhost:local-port`，如果您使用自訂本機連接埠) 以存取 Flink Web Dashboard。此儀表板會顯示有關執行中 Flink 應用程式的資訊，例如作業狀態、TaskManager 數目以及作業指標和日誌。  
![\[Flink 儀表板 UI 範例\]](http://docs.aws.amazon.com/zh_tw/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# 使用監控組態來監控 Flink Kubernetes Operator 和 Flink 任務
<a name="jobruns-flink-monitoring-configuration"></a>

監控組態可讓您輕鬆地將 Flink 應用程式和 Operator 日誌的日誌封存設定為 S3 和/或 CloudWatch (您可以選擇其中一個或兩者)。這樣做可將 FluentD 附屬項新增到 JobManager 和 TaskManager Pod，隨後將這些元件的日誌轉發到您設定的接收器。

**注意**  
必須為 Flink Oerator 和 Flink 作業 (服務帳戶) 的服務帳戶設定 IAM 角色，才能使用此功能，因為它需要與其他 AWS 服務互動。必須在 [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 中使用 IRSA 進行設定。

## Flink 應用程式日誌
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

可採用以下方法定義此設定。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

以下是組態選項。
+ `s3MonitoringConfiguration` - 用於設定轉送至 S3 的組態金鑰
  + `logUri` (必要) - 想要在其中儲存日誌的 S3 儲存貯體路徑。
  + 上傳日誌後 S3 上的路徑將如下所示。
    + 未啟用日誌輪換：

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + 日誌輪換已啟用。可以同時使用輪換的檔案和目前檔案 (沒有日期戳記的檔案)。

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      下面格式是遞增的數字。

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + 使用此轉寄站需要以下 IAM 許可。

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration` - 用於設定轉送至 CloudWatch 的組態金鑰。
  + `logGroupName` (必要) - 您要向其傳送日誌的 CloudWatch 日誌群組名稱 (如果群組不存在，則自動建立群組)。
  + `logStreamNamePrefix` (選用) - 您想要向其傳送日誌的日誌串流名稱。預設值為空字串。格式如下所示：

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + 使用此轉寄站需要以下 IAM 許可。

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources` (選用) - 用於在已啟動的 Fluentbit 附屬容器上設定資源限制的組態金鑰。
  + `memoryLimit` (選用) - 預設值為 512Mi。根據需要進行調整。
  + `cpuLimit` (選用) - 此選項沒有預設值。根據需要進行調整。
+ `containerLogRotationConfiguration` (選用) - 控制容器日誌輪換行為。依預設會啟用此功能。
  + `rotationSize` (必要) - 指定日誌輪換的檔案大小。可能的值範圍為 2KB 至 2GB。rotationSize 參數的數值單位部分會以整數形式傳遞。由於不支援小數值，因此可以使用值 1500MB 來指定 1.5GB 的輪換大小。預設值為 2GB。
  + `maxFilesToKeep` (必要) — 指定輪換發生後，要在容器中保留的檔案數上限。下限值是 1，上限值是 50。預設為 10。

## Flink Operator 日誌
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

也可以使用 Helm Chart 安裝中 `values.yaml` 檔案的下列選項，為 Operator 啟用日誌封存。可以啟用 S3、CloudWatch 或兩者。

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

以下是 `monitoringConfiguration` 下的可用組態選項。
+ `s3MonitoringConfiguration` - 將此選項設定為存檔至 S3。
+ `logUri` (必要) - 想要在其中儲存日誌的 S3 儲存貯體路徑。
+ 以下是上傳日誌後 S3 儲存貯體路徑的可能格式。
  + 未啟用日誌輪換。

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + 日誌輪換已啟用。可以同時使用輪換的檔案和目前檔案 (沒有日期戳記的檔案)。

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    下面的格式索引是遞增的數字。

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration` - 用於設定轉送至 CloudWatch 的組態金鑰。
  + `logGroupName` (必要) - 您想要向其傳送日誌的 CloudWatch 日誌群組名稱。如果群組不存在，則會自動建立群組。
  + `logStreamNamePrefix` (選用) — 您想要向其傳送日誌的日誌串流名稱。預設值為空字串。CloudWatch 的格式如下所示：

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources` (選用) — 用於在已啟動的 Fluentbit 附屬容器上設定資源限制的組態金鑰。
  + `memoryLimit` (選用) - 記憶體限制。根據需要進行調整。預設值為 512Mi。
  + `cpuLimit` - CPU 限制。根據需要進行調整。無預設值。
+ `containerLogRotationConfiguration` (選用) - 控制容器日誌輪換行為。依預設會啟用此功能。
  + `rotationSize` (必要) - 指定日誌輪換的檔案大小。可能的值範圍為 2KB 至 2GB。rotationSize 參數的數值單位部分會以整數形式傳遞。由於不支援小數值，因此可以使用值 1500MB 來指定 1.5GB 的輪換大小。預設值為 2GB。
  + `maxFilesToKeep` (必要) — 指定輪換發生後，要在容器中保留的檔案數上限。下限值是 1，上限值是 50。預設為 10。

# Flink 如何支援高可用性和任務彈性
<a name="jobruns-flink-resiliency"></a>

下列各節概述 Flink 如何讓任務更可靠且高可用性。它透過 Flink 高可用性等內建功能執行此操作，並在發生故障時提供各種復原功能。

**Topics**
+ [使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)](jobruns-flink-using-ha.md)
+ [使用 Amazon EMR on EKS 優化 Flink 作業重新啟動時間，以進行任務復原和擴展操作](jobruns-flink-restart.md)
+ [使用 Amazon EMR on EKS 上的 Flink 正常解除委任 Spot 執行個體](jobruns-flink-decommission.md)

# 使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)
<a name="jobruns-flink-using-ha"></a>

本主題說明如何設定高可用性，並說明它在幾個不同的使用案例中的運作方式。這包括當您使用 任務管理員時，以及當您使用 Flink 原生 kubernetes 時。

## Flink Operator 高可用性
<a name="jobruns-flink-ha-operator"></a>

我們啟用 Flink Operator 的*高可用性*，以便可以容錯移轉至待命 Flink Operator，在發生故障時將 Operator 控制迴圈中的停機時間降至最低。依預設會啟用「高可用性」，且起始 Operator 複本的預設數目為 2。可以在 `values.yaml` 檔案中設定 Helm Chart 的複本欄位。

下列欄位可自訂：
+ `replicas` (選用，預設值為 2)：將此數字設定為大於 1 可建立其他待命 Operator，並允許更快速地復原作業。
+ `highAvailabilityEnabled` (選用，預設值為 true)：控制是否要啟用 HA。將此參數指定為 true 可啟用多可用區部署支援，並設定正確的 `flink-conf.yaml` 參數。

透過在 `values.yaml` 檔案中設定下列組態，停用 operator 的高可用性。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**多可用區部署**

我們會在多個可用區域建立 operator Pod。這是一個軟約束，如果您在不同的可用區域中沒有足夠的資源，將在相同的可用區域中排程您的 operator Pod 。

**確定領導者複本**

 如果啟用 HA，則複本使用 Lease 來確定哪些 JM 是領導者，並使用 K8s Lease 進行領導者選舉。您可以描述 Lease 並查看 .Spec.Holder Identity 欄位，以確定目前的領導者

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 互動**

**設定存取憑證**

請確定您已設定 IRSA，具有可存取 S3 儲存貯體的適當 IAM 許可。

**從 S3 應用程式模式中擷取作業 jar**

Flink Operator 也支援從 S3 中擷取應用程式 jar。只需在 FlinkDeployment 規格中提供 jarURI 的 S3 位置即可。

也可以使用此功能來下載其他成品，例如 PyFLink 指令碼。生成的 Python 指令碼放在路徑 `/opt/flink/usrlib/` 下。

以下範例示範如何將此功能用於 PyFLink 作業。請注意 jarURI 和引數欄位。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 連接器**

Flink 隨附有兩個 S3 連接器 (如下所列)。以下各章節討論何時使用哪個連接器。

**檢查點：Presto S3 連接器**
+ 將 S3 結構描述設為 s3p://
+ 用於檢查點到 s3 的建議連接器。如需詳細資訊，請參閱 Apache Flink 文件中的 [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)。

FlinkDeployment 規格範例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**讀取和寫入 S3：Hadoop S3 連接器**
+ 將 S3 結構描述設定為 `s3://` 或 (`s3a://`)
+ 用於從 S3 讀取和寫入檔案的建議連接器 (只有 S3 連接器實作 [Flinks Filesystem 介面](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/))。
+ 根據預設，我們在 `flink-conf.yaml` 檔案`fs.s3a.aws.credentials.provider`中設定 ，也就是 `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`。如果完全覆寫預設 `flink-conf` 並且正在與 S3 進行互動，請確保使用此提供程式。

FlinkDeployment 規格範例

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Flink 部署的高可用性 (HA) 可讓作業繼續進行，即使遇到暫時性錯誤以及 JobManager 當機。作業將會重新啟動，但會從上次啟用 HA 的成功檢查點開始。如果沒有啟用 HA，Kubernetes 將重新啟動 JobManager，但您的作業將作為新作業開始，並將失去其進度。設定 HA 之後，我們可以告訴 Kubernetes 將 HA 中繼資料儲存在永久性儲存體中，以便在 JobManager 中發生暫時性失敗時進行參考，然後從上次成功的檢查點恢復我們的作業。

Flink 作業預設為啟用 HA (複本計數設為 2，這將要求您提供 S3 儲存位置，以便 HA 中繼資料持續存在)。

**HA 組態**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下是 Job Manager 中上述 HA 組態的描述 (在 .spec.jobManager 下定義)：
+ `highAvailabilityEnabled` (選用，預設值為 true)：如果您不想啟用 HA 且不想使用提供的 HA 組態，請將此設定為 `false `。您仍然可以操作「複本」欄位以手動設定 HA。
+ `replicas` (選用，預設值為 2)：將此數字設定為大於 1 可建立其他待命 JobManager，並允許更快速地復原作業。如果停用 HA，則必須將複本計數設定為 1，否則您將繼續收到驗證錯誤 (如果未啟用 HA，則僅支援 1 個複本)。
+ `storageDir` (必填)：因為我們預設使用的複本計數為 2，所以我們必須提供一個持久的 StorageDir。目前，此欄位僅接受 S3 路徑作為儲存位置。

**Pod 位置**

 如果您啟用 HA，我們也會嘗試在同一個可用區域中共置 Pod，進而提升效能 (藉由將 Pod 放在相同可用區域來減少網路延遲)。這是一個竭盡全力的過程，意味著如果您在對大部分 Pod 進行排程的可用區域中沒有足夠的資源，剩餘的 Pod 仍會排程，但最終可能會在此可用區域以外的節點上結束。

**確定領導者複本**

如果啟用 HA，複本會使用租用來判斷哪個 JM 是領導者，並使用 K8s Configmap 作為儲存此中繼資料的資料儲存。如果您想確定領導者，可以查看 Configmap 的內容，然後查看資料下的關鍵字 `org.apache.flink.k8s.leader.restserver`，以使用該 IP 地址尋找 K8s Pod。也可使用下列 bash 命令。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 作業：原生 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本支援 Flink Native Kubernetes，以便在 Amazon EKS 叢集上以高可用性模式執行 Flink 應用程式。

**注意**  
提交 Flink 作業時，必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能，可以停用它。依預設會啟用此功能。

若要開啟 Flink 高可用性功能，請在[執行 `run-application` CLI 命令](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)時提供下列 Flink 參數。參數定義於範例下方。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**：您要在其中存放作業的高可用性中繼資料的 Amazon S3 儲存貯體。

  **`Dkubernetes.jobmanager.replicas`**：要建立的 Job Manager Pod 數量，為大於 `1` 的整數。

  **`Dkubernetes.cluster-id`**：識別 Flink 叢集的唯一識別碼。

# 使用 Amazon EMR on EKS 優化 Flink 作業重新啟動時間，以進行任務復原和擴展操作
<a name="jobruns-flink-restart"></a>

當任務失敗或發生擴展操作時，Flink 會嘗試從最後一個完成的檢查點重新執行任務。根據檢查點狀態的大小和平行任務數量，重新啟動程序可能需要一分鐘或更長的時間才能執行。在重新啟動期間，作業的積壓任務可能會累積。不過，Flink 有一些方法可以優化執行圖表的復原和重新啟動速度，以提高作業穩定性。

此頁面說明 Amazon EMR Flink 在 Spot 執行個體上的任務復原或擴展操作期間，可以改善任務重新啟動時間的一些方式。Spot 執行個體是未使用的運算容量，以折扣價提供。它具有獨特的行為，包括偶爾中斷，因此請務必了解 Amazon EMR on EKS 如何處理這些行為，包括 Amazon EMR on EKS 如何執行除役和重新啟動任務。

**Topics**
+ [任務本機復原](#flink-restart-task-local)
+ [透過 Amazon EBS 磁碟區掛載進行任務本機復原](#flink-restart-task-local-ebs)
+ [一般日誌型增量檢查點](#flink-restart-log-check)
+ [精細復原](#flink-restart-fine-grained)
+ [調整式排程器中的合併重新啟動機制](#flink-restart-combined)

## 任務本機復原
<a name="flink-restart-task-local"></a>

**注意**  
Amazon EMR on EKS 6.14.0 及更高版本上的 Flink 支援任務本機復原。

透過 Flink 檢查點，每個任務皆會產生其狀態的快照，Flink 會將其寫入分散式儲存體 (如 Amazon S3)。在復原的情況下，任務會從分散式儲存體還原其狀態。分散式儲存提供容錯能力，並且由於所有節點皆可存取分散式儲存體，因此可以在重新擴展期間重新分配狀態。

但是，遠端分散式存放區也有一個缺點：所有任務均須透過網路從遠端位置讀取其狀態。這可能會導致任務復原或擴展操作期間大型狀態的復原時間較長。

您可透過*任務本機復原*來解決復原時間較長的問題。任務會將檢查點上的狀態寫入任務本機的次要儲存體，例如本機磁碟上。同時還會將其狀態存放在主要儲存體中 (在此例中為 Amazon S3)。在復原期間，排程器會在較早執行任務的相同 Task Manager 上排定任務，以便其可以從本機狀態存放區復原，而不是從遠端狀態存放區讀取。如需詳細資訊，請參閱《Apache Flink 文件》**中的[任務本機復原](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我們對範例作業的基準測試指出，啟用任務本機復原後，復原時間已從幾分鐘縮短至幾秒鐘。

若要啟用任務本機復原，請在 `flink-conf.yaml` 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## 透過 Amazon EBS 磁碟區掛載進行任務本機復原
<a name="flink-restart-task-local-ebs"></a>

**注意**  
Amazon EMR on EKS 6.15.0 及更高版本上的 Flink 支援 Amazon EBS 的任務本機復原。

透過 Amazon EMR on EKS 上的 Flink，您可以自動向 TaskManager Pod 佈建 Amazon EBS 磁碟區以進行任務本機復原。預設的覆蓋掛載隨附 10 GB 磁碟區，對於狀態較低的作業而言即足夠。具有大型狀態的作業可以啟用*自動 EBS 磁碟區掛載*選項。TaskManager Pod 會在 Pod 建立期間自動建立和掛載，並在 Pod 刪除期間移除。

使用下列步驟為 Amazon EMR on EKS 中的 Flink 啟用自動 EBS 磁碟區掛載：

1. 匯出您將在後續步驟中使用的下列變數值。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 為您的叢集建立或更新 `kubeconfig` YAML 檔案。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. 為 Amazon EKS 叢集上的 Amazon EBS 容器儲存介面 (CSI) 驅動程式建立 IAM 服務帳戶。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 使用下列命令建立 Amazon EBS CSI 驅動程式：

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 使用下列命令建立 Amazon EBS 儲存類別：

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   然後套用該類別：

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm 安裝 Amazon EMR Flink Kubernetes Operator，並提供建立服務帳戶的選項。這會建立要在 Flink 部署中使用的 `emr-containers-sa-flink`。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. 若要提交 Flink 作業並啟用 EBS 磁碟區的自動佈建以進行任務本機復原，請在 `flink-conf.yaml` 檔案中設定下列組態。調整作業狀態大小的大小限制。將 `serviceAccount` 設定為 `emr-containers-sa-flink`。指定檢查點間隔值 (以毫秒為單位)。並省略 `executionRoleArn`。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

當您準備好刪除 Amazon EBS CSI 驅動程式外掛程式時，請使用下列命令：

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 一般日誌型增量檢查點
<a name="flink-restart-log-check"></a>

**注意**  
Amazon EMR on EKS 6.14.0 及更高版本上的 Flink 支援一般日誌型增量檢查點。

Flink 1.16 中新增了一般日誌型增量檢查點，以提高檢查點的速度。較快的檢查點間隔通常可減少復原工作，因為復原後需要重新處理的事件較少。如需詳細資訊，請參閱 *Apache Flink 部落格*上的 [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

我們對範例作業的基準測試指出，使用一般日誌型增量檢查點時，檢查點時間已從幾分鐘縮短至幾秒鐘。

若要啟用一般日誌型增量檢查點，請在 `flink-conf.yaml` 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精細復原
<a name="flink-restart-fine-grained"></a>

**注意**  
Amazon EMR on EKS 6.14.0 及更高版本上的 Flink 提供對預設排程器的精細復原支援。Amazon EMR on EKS 6.15.0 及更高版本上的 Flink 提供調整式排程器中的精細復原支援。

當任務在執行過程中失敗時，Flink 會重設整個執行圖表，並從最後一個完成的檢查點觸發完整的重新執行。相較於僅重新執行失敗的任務，此方式的成本更高。精細復原僅會重新啟動失敗任務的管道連接元件。在下列範例中，作業圖表有 5 個頂點 (`A` 至 `E`)。頂點之間的所有連接均採用逐點分佈的管道方式，且作業的 `parallelism.default` 設定為 `2`。

```
A → B → C → D → E
```

在此範例中，總共有 10 個任務正在執行。第一個管道 (`a1` 至 `e1`) 在 TaskManager (`TM1`) 上執行，而第二個管道 (`a2` 至 `e2`) 在另一個 TaskManager (`TM2`) 上執行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有兩個管道連接的元件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 失敗，則失敗僅會影響正在執行 TaskManager 之管道中的 5 個任務。重新啟動策略只會啟動受影響的管道元件。

精細復原僅適用於完全平行的 Flink 作業。`keyBy()` 或 `redistribute()` 操作不支援。如需詳細資訊，請參閱 *Flink Improvement Proposal* Jira 專案中的 [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

若要啟用精細復原，請在 `flink-conf.yaml` 檔案中設定下列組態。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 調整式排程器中的合併重新啟動機制
<a name="flink-restart-combined"></a>

**注意**  
Amazon EMR on EKS 6.15.0 及更高版本上的 Flink 支援調整式排程器中的合併重新啟動機制。

調整式排程器可根據可用的插槽調整作業的並行度。如果沒有足夠的插槽以符合設定的作業並行度，則會自動降低並行度。如果有新的插槽可用，作業就會再次縱向擴展至設定的作業並行度。在沒有足夠的可用資源時，調整式排程器可以避免作業停機。這是 Flink Autoscaler 支援的排程器。基於這些原因，我們建議使用 Amazon EMR Flink 搭配調整式排程器。但是，調整式排程器可能會在短時間內進行多次重新啟動，每新增一個新資源就會重新啟動一次。這可能會導致作業效能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在調整式排程器中具有合併重新啟動機制，該機制會在新增第一個資源時開啟重新啟動時段，然後等到設定的預設 1 分鐘時段間隔為止。當有足夠的資源可使用設定的並行度執行作業時，或當間隔逾時，其會執行單次重新啟動。

我們對範例作業的基準測試指出，當您使用調整式排程器和 Flink 自動擴展器時，此功能比預設行為多處理 10% 的記錄。

若要啟用合併重新啟動機制，請在 `flink-conf.yaml` 檔案中設定下列組態。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# 使用 Amazon EMR on EKS 上的 Flink 正常解除委任 Spot 執行個體
<a name="jobruns-flink-decommission"></a>

Flink 搭配 Amazon EMR on EKS 可以改善任務復原或擴展操作期間的作業重新啟動時間。

## 概觀
<a name="jobruns-flink-decommission-overview"></a>

Amazon EMR on EKS 版本 6.15.0 及更高版本支援使用 Apache Flink 在 Amazon EMR on EKS 中的 Spot 執行個體上正常解除委任 Task Manager。作為此功能的一部分，Amazon EMR on EKS 與 Flink 提供以下功能：
+ **即時檢查點**：Flink 串流作業可回應 Spot 執行個體中斷、對正在執行的作業執行即時 (JIT) 檢查點，並防止在這些 Spot 執行個體上排定其他任務。預設和調整式排程器支援 JIT 檢查點。
+ **合併重新啟動機制**：合併重新啟動機制會在作業達到目標資源並行度或目前設定的時段結束時，盡力嘗試重新啟動作業。這也可以防止因多個 Spot 執行個體終止而可能導致的連續作業重新啟動。組合重新啟動機制僅適用於調整式排程器。

這些功能具有以下優點：
+ 您可以利用 Spot 執行個體來執行 Task Manager 並減少叢集支出。
+ 改善 Spot 執行個體 Task Manager 的活性，可提高彈性並實現更有效率的作業排程。
+ 您的 Flink 作業將具有更長的正常執行時間，因為 Spot 執行個體終止後的重新啟動次數將減少。

## 正常解除委任的運作方式
<a name="jobruns-flink-decommission-howitworks"></a>

請考量下列範例：您佈建一個執行 Apache Flink 的 Amazon EMR on EKS 叢集，並為 Job Manager 指定隨需節點，以及為 Task Manager 指定 Spot 執行個體節點。終止前兩分鐘，Task Manager 收到中斷通知。

在此案例中，Job Manager 會處理 Spot 執行個體中斷訊號、封鎖 Spot 執行個體上其他任務的排程，以及為串流作業啟動 JIT 檢查點。

然後，只有在目前的重新啟動間隔時段中有足夠的新資源可用性以滿足目前的作業並行度之後，Job Manager 才會重新啟動作業圖表。重新啟動時段間隔是根據 Spot 執行個體更換持續時間、新 Job Manager Pod 的建立，以及向 Job Manager 註冊來決定。

## 先決條件
<a name="jobruns-flink-decommission-prereqs"></a>

若要使用正常的解除編譯，請在執行 Apache Flink 的 Amazon EMR on EKS 叢集上建立並執行串流任務。啟用在至少一個 Spot 執行個體上排定的調整式排程器和 Task Manager，如以下範例所示。您應針對 Job Manager 使用隨選節點，並且只要至少有一個 Spot 執行個體，就可以將隨選節點用於 Task Manager。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuration
<a name="jobruns-flink-decommission-config"></a>

本節涵蓋了您可以根據解除委任需求指定的大部分組態。


| 金錀 | Description | 預設值 | 可接受值 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  啟用 Task Manager 的正常解除委任。  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  在調整式排程器中啟用合併重新啟動機制。  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  為作業執行合併重新啟動的合併重新啟動時段間隔。未顯示單位的整數即視為以毫秒為單位。  |  1m  |  範例：30、60s、3m、1h | 

# 針對 Flink 應用程式使用 Autoscaler
<a name="jobruns-flink-autoscaler"></a>

運算子自動擴展器可透過從 Flink 作業中收集指標並在作業頂點層級自動調整平行程度來協助減輕背壓。以下為組態具體形式的範例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

此組態會使用 Amazon EMR 最新版本的預設值。如果您使用其他版本，則可能會有不同的值。

**注意**  
從 Amazon EMR 7.2.0 開始，您不需要在組態`kubernetes.operator`中包含 字首。如果您使用 7.1.0 或更低版本，您必須在每個組態之前使用 字首。例如，您必須指定 `kubernetes.operator.job.autoscaler.scaling.enabled`。

以下是自動擴展器的組態選項。
+ `job.autoscaler.scaling.enabled` – 指定是否啟用自動擴展器的頂點擴展執行。預設值為 `true`。如果您停用此組態，自動擴展器只會收集指標並評估每個頂點的建議平行處理，但不會升級任務。
+ `job.autoscaler.stabilization.interval` - 不會執行新擴展的穩定期。預設為 5 分鐘。
+ `job.autoscaler.metrics.window` - 擴展指標彙總視窗大小。視窗越大，越平滑和穩定，但自動擴展器可能會更慢，以應對突然的負載變化。預設為 15 分鐘。建議您使用 3 到 60 分鐘之間的值進行實驗。
+ `job.autoscaler.target.utilization` - 目標頂點利用率，以提供穩定的作業效能和一些負載波動緩衝。預設值為 `0.7`，目標是作業頂點 70% 的使用率/負載。
+ `job.autoscaler.target.utilization.boundary` - 作為額外緩衝的目標頂點利用率邊界，以避免負載波動的立即擴展。預設值為 `0.3`，這表示在觸發擴展動作之前，允許偏離目標使用率 30%。
+ `ob.autoscaler.restart.time` - 重新啟動應用程式的預期時間。預設為 5 分鐘。
+ `job.autoscaler.catch-up.duration` - 追趕的預期時間，這意味著在擴展操作完成後完全處理任何待處理項。預設為 5 分鐘。透過降低追趕時間，自動擴展器必須為擴展動作保留更多額外容量。
+ `pipeline.max-parallelism` - 自動擴展器可以使用的最大並行度。如果自動擴展器高於 Flink 組態中或直接在每個運算子上設定的最大並行度，則會忽略此限制。預設值為 -1。請注意，自動擴展器將並行度計算為最大並行數的除數，因此建議選擇具有大量除數的最大並行度設定，而不是依賴 Flink 提供的預設值。建議此組態使用 60 的倍數，例如 120、180、240、360、720 等。

如需更詳細的組態參考頁面，請參閱[自動擴展器組態](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)。

# Autoscaler 參數自動調校
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

本節說明各種 Amazon EMR 版本的自動調校行為。它也會詳細說明不同的自動調整規模組態。

**注意**  
Amazon EMR 7.2.0 及更高版本使用開放原始碼組態`job.autoscaler.restart.time-tracking.enabled`來啟用**重新擴展時間估算**。重新調整時間估算的功能與 Amazon EMR 自動調校功能相同，因此您不需要手動將經驗值指派給重新啟動時間。  
如果您使用的是 Amazon EMR 7.1.0 或更低版本，您仍然可以使用 Amazon EMR 自動調校。

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 及更高版本會測量套用自動調整規模決策所需的實際重新啟動時間。在 7.1.0 及更低版本中，您必須使用 組態`job.autoscaler.restart.time`來手動設定預估的最長重新啟動時間。透過使用組態 `job.autoscaler.restart.time-tracking.enabled`，您只需輸入第一個擴展的重新啟動時間。之後，運算子會記錄實際的重新啟動時間，並將它用於後續的擴展。

若要啟用此追蹤，請使用下列命令：

```
job.autoscaler.restart.time-tracking.enabled: true
```

以下是重新調整規模時間估算的相關組態。


| Configuration | 必要 | 預設 | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 否 | False | 指出 Flink Autoscaler 是否應隨時間自動調整組態，以最佳化擴展決策。請注意，Autoscaler 只能自動調整 Autoscaler 參數 restart.time。 | 
| job.autoscaler.restart.time | 否 | 5m | Amazon EMR on EKS 使用的預期重新啟動時間，直到運算子可以判斷先前擴展的實際重新啟動時間為止。 | 
| job.autoscaler.restart.time-tracking.limit | 否 | 15m | job.autoscaler.restart.time-tracking.enabled 將 設定為 時觀察到的重新啟動時間上限true。 | 

以下是您可以用來嘗試重新調整規模時間估算的範例部署規格：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

若要模擬背壓，請使用下列部署規格。

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

將下列 Python 指令碼上傳至 S3 儲存貯體。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

若要驗證重新調整規模時間估算是否正常運作，請確定 Flink 運算子的`DEBUG`層級記錄已啟用。以下範例示範如何更新 helm Chart 檔案 `values.yaml`。然後重新安裝更新的 Helm Chart，然後再次執行 Flink 任務。

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

取得領導 Pod 的名稱。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

執行下列命令以取得指標評估中使用的實際重新啟動時間。

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

您應該會看到類似以下的日誌。請注意，只有第一個擴展使用 ` job.autoscaler.restart.time`。後續擴展會使用觀察到的重新啟動時間。

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

開放原始碼內建的 Flink Autoscaler 使用許多指標來做出最佳的擴展決策。不過，它用於計算的預設值適用於大多數工作負載，可能不適用於指定的任務。新增至 Amazon EMR on EKS 版本的 Flink Operator 的自動調校功能會查看在特定擷取指標上觀察到的歷史趨勢，然後嘗試計算針對特定任務量身打造的最佳值。


| Configuration | 必要 | 預設 | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 否 | False | 指出 Flink Autoscaler 是否應該隨著時間自動調整組態，以最佳化自動擴展器擴展決策。目前，Autoscaler 只能自動調整 Autoscaler 參數 restart.time。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 否 | 3 | 指出 Autoscaler 在 Amazon EMR on EKS 指標組態映射中保留多少歷史 Amazon EMR on EKS 指標。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 否 | 3 | 指出 Autoscaler 開始計算指定任務的平均重新啟動時間之前執行的重新啟動次數。 | 

若要啟用自動調校，您必須完成下列操作：
+ 將 `kubernetes.operator.job.autoscaler.autotune.enable:` 設定為 `true`
+ 將 `metrics.job.status.enable:` 設定為 `TOTAL_TIME`
+ 遵循為 [Flink 應用程式使用 Autoscaler ](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html)的設定，以啟用 Autoscaling。

以下是您可以用來嘗試自動調校的範例部署規格。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

若要模擬背壓，請使用下列部署規格。

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

將下列 Python 指令碼上傳至 S3 儲存貯體。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

若要確認您的自動調校器是否正常運作，請使用下列命令。請注意，您必須使用自己的 Flink Operator 領導 Pod 資訊。

首先取得領導 Pod 的名稱。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

取得領導 Pod 的名稱後，您可以執行下列命令。

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

您應該會看到類似以下的日誌。

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Amazon EMR on EKS 上 Flink 任務的維護和疑難排解
<a name="jobruns-flink-troubleshooting"></a>

下列各節概述如何維護長時間執行的 Flink 任務，並提供有關如何對 Flink 任務的一些常見問題進行故障診斷的指引。

# 維護 Flink 應用程式
<a name="jobruns-flink-maintain"></a>

**Topics**
+ [升級模式](#jobruns-flink-upgrademode)

Flink 應用程式通常設計為執行很長時間，例如數週、數月甚至數年。與所有長時間執行的服務一樣，Flink 串流應用程式需要進行維護。這包括錯誤修正、改善項目以及遷移至更高版本的 Flink 叢集。

當 `FlinkDeployment` 和 `FlinkSessionJob` 資源的規格發生變化時，您需要升級正在執行的應用程式。為此，操作員會停止正在執行的作業 (除非已暫停)，並使用最新規格重新部署，對於有狀態的應用程式，則使用上次執行的狀態。

使用者可透過 `JobSpec` 的 `upgradeMode` 設定來控制有狀態的應用程式停止和還原時如何管理狀態。

## 升級模式
<a name="jobruns-flink-upgrademode"></a>

選用介紹

**無狀態**  
無狀態應用程式從空狀態升級。

**最後狀態**  
在任何應用程式狀態下快速升級 (即使是失敗的作業) 時不需要運作正常的作業，因為一律會使用最新的成功檢查點。如果遺失 HA 中繼資料，可能需要手動復原。若要限制作業在擷取最新檢查點時回退的時間，您可以設定 `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`。如果檢查點早於設定的值，則會改為針對運作正常的作業採用儲存點。工作階段模式不支援此功能。

**儲存點**  
使用儲存點進行升級，可提供最大的安全性和作為備份/分叉點的可能性。儲存點將在升級過程中建立。請注意，需執行 Flink 作業才能建立儲存點。如果作業處於運作狀態不佳的狀態，則會使用最後一個檢查點 (除非 kubernetes.operator.job.upgrade.last-state-fallback.enabled 設定為 false)。如果最後一個檢查點無法使用，作業升級將會失敗。

# 疑難排解
<a name="jobruns-flink-troubleshoot"></a>

本章節描述了如何疑難排解 Amazon EMR on EKS 的問題。如需有關如何疑難排解 Amazon EMR 一般問題的資訊，請參閱《Amazon EMR 管理指南》**中的[對叢集進行疑難排解](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html)。
+ [使用 PersistentVolumeClaims (PVC) 對作業進行疑難排解](permissions-for-pvc.md)
+ [對 Amazon EMR on EKS 垂直自動擴展進行疑難排解](troubleshooting-vas.md)
+ [對 Amazon EMR on EKS Spark Operator 進行疑難排解](troubleshooting-sparkop.md)

## 對 Amazon EMR on EKS 中的 Apache Flink 進行疑難排解
<a name="jobruns-flink-troubleshooting-apache-flink"></a>

### 安裝 Helm Chart 時找不到資源映射
<a name="w2aac21c21b7b7b3"></a>

安裝 Helm Chart 時，可能會遇到下列錯誤訊息。

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

若要解決此錯誤，請安裝 cert-manager 以啟用新增 Webhook 元件。必須將 cert-manager 安裝到您使用的每個 Amazon EKS 叢集。

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### AWS 服務 存取遭拒錯誤
<a name="jobruns-flink-troubleshooting-access-denied"></a>

如果看到 *access denied* 錯誤，請確認 Helm Chart `values.yaml` 檔案中 `operatorExecutionRoleArn` 的 IAM 角色具有正確許可。此外，請確保 `FlinkDeployment` 規格中 `executionRoleArn` 下的 IAM 角色具有正確許可。

### `FlinkDeployment` 出現問題
<a name="jobruns-flink-troubleshooting-stuck"></a>

如果您的 `FlinkDeployment` 處於停止狀態，請使用下列步驟強制刪除部署：

1. 編輯部署執行。

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. 刪除此完成項。

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. 刪除部署。

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### 在選擇加入中執行 Flink 應用程式時出現 s3a AWSBadRequestException 問題 AWS 區域
<a name="jobruns-flink-troubleshooting-optin-region"></a>

如果您在[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)中執行 Flink 應用程式，您可能會看到下列錯誤：

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

若要修正這些錯誤，請在您的`FlinkDeployment`定義檔案中使用以下組態。

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

我們也建議您使用 SDKv2 登入資料提供者：

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

如果您想要使用 SDKv1 登入資料提供者，請確定您的 SDK 支援您的選擇加入區域。如需詳細資訊，請參閱 [aws-sdk-java GitHub 儲存庫](https://github.com/aws/aws-sdk-java)。

如果您在選擇加入區域中執行 Flink SQL 陳述式`S3 AWSBadRequestException`時收到 ，請確定您在 flink 組態規格`fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME`中設定組態。

### 在 CN 區域中執行 Flink 工作階段任務時的 S3A AWSBadRequestException
<a name="jobruns-flink-troubleshooting-optin-region"></a>

對於 Amazon EMR 6.15.0 - 7.2.0 版，當您在 CN 區域執行 Flink 工作階段任務時，可能會遇到下列錯誤訊息。這些包括中國 （北京） 和中國 （寧夏）：

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

知道此問題。團隊正在努力修補所有這些發行版本的 flink 運算子。不過，在我們完成修補程式之前，若要修正此錯誤，您需要下載 flink Operator helm Chart、將其解壓縮 （擷取壓縮檔案），並在 helm Chart 中進行組態變更。

特定步驟如下：

1. 將目錄變更為 Helm Chart 的本機資料夾，並執行下列命令列以提取 Helm Chart 並解壓縮 （擷取）。

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. 前往 helm Chart 資料夾並尋找 `templates/flink-operator.yaml` 檔案。

1. 尋找 `flink-operator-config` ConfigMap，並在 中新增下列`fs.s3a.endpoint.region`組態`flink-conf.yaml`。例如：

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. 安裝本機 Helm Chart 並執行您的任務。

# 具有 Apache Flink 的 Amazon EMR on EKS 的支援版本
<a name="jobruns-flink-security-release-versions"></a>

Apache Flink 可用於以下 Amazon EMR on EKS 版本。如需所有可用版本的資訊，請參閱 [Amazon EMR on EKS 發行版本](emr-eks-releases.md)。


| 版本標籤 | Java | Flink | Flink 運算子 | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 