

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 設定和使用 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator"></a>

以下頁面描述了如何設定和使用 Flink Kubernetes Operator，以使用 Amazon EMR on EKS 執行 Flink 作業。可用的主題包括必要的先決條件、如何設定您的環境，以及在 Amazon EMR on EKS 上執行 Flink 應用程式。

**Topics**
+ [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md)
+ [安裝適用於 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-getting-started.md)
+ [執行 Flink 應用程式](jobruns-flink-kubernetes-operator-run-application.md)
+ [執行 Flink 應用程式的安全角色許可](jobruns-flink-kubernetes-security.md)
+ [針對 Amazon EMR on EKS 解除安裝 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-uninstall.md)

# 針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-setup"></a>

先完成下列任務，然後在 Amazon EKS 上安裝 Flink Kubernetes Operator。如果已經註冊 Amazon Web Services (AWS) 且已在使用 Amazon EKS，則您幾乎可使用 Amazon EMR on EKS。完成下列任務，即可在 Amazon EKS 上設定 Flink Operator。如果已經完成任何先決條件，則可以跳過這些先決條件，然後繼續進行下一個。
+ **[安裝或更新至最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)**- 如果您已安裝 AWS CLI，請確認您擁有最新版本。
+ **[設定 kubectl 和 eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** – eksctl 是您用來與 Amazon EKS 通訊的命令列工具。
+ **[安裝 Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** – Kubernetes 的 Helm 套件管理工具可協助您安裝和管理 Kubernetes 叢集上的應用程式。
+ **[開始使用 Amazon EKS – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** – 請依照步驟在 Amazon EKS 中建立具有節點的新 Kubernetes 叢集。
+ **[選擇 Amazon EMR 發行標籤](jobruns-flink-security-release-versions.md) (6.13.0 版或更新版本）** – Amazon EMR 6.13.0 版及更高版本支援 Flink Kubernetes Operator。
+ **[在 Amazon EKS 叢集上啟用服務帳戶的 IAM 角色 (IRSA)](setting-up-enable-IAM.md)**。
+ **[建立作業執行角色](creating-job-execution-role.md)**。
+ **[更新作業執行角色的信任政策 ](setting-up-trust-policy.md)**。
+ 建立操作員執行角色。此為選擇性步驟。可以對 Flink 作業和操作員使用相同的角色。如果想要為操作員使用不同的 IAM 角色，可以建立單獨的角色。
+ 更新操作員執行角色的信任政策。必須針對想要用於 Amazon EMR Flink Kubernetes 操作員服務帳戶的角色明確新增一個信任政策項目。可以遵循以下範例格式：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# 安裝適用於 Amazon EMR on EKS 的 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

本主題透過準備 Flink 部署，協助您開始在 Amazon EKS 上使用 Flink Kubernetes Operator。

## 安裝 Kubernetes 運算子
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

請使用下列步驟來安裝 Kubernetes Operator for Apache Flink。

1. 如果您尚未這麼做，請完成 [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 中的步驟。

1. 安裝 *cert-manager* (每個 Amazon EKS 叢集一次) 以允許新增 Webhook 元件。

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. 安裝 Helm Chart。

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   輸出範例：

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. 等待部署完成，然後驗證 Chart 安裝。

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. 部署完成時，您應該會看到下列訊息。

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. 使用以下命令查看已部署的 Operator。

   ```
   helm list --namespace $NAMESPACE
   ```

   以下顯示範例輸出，其中應用程式版本 `x.y.z-amzn-n` 與 Amazon EMR on EKS 版本的 Flink Operator 版本相對應。如需詳細資訊，請參閱[具有 Apache Flink 的 Amazon EMR on EKS 的支援版本](jobruns-flink-security-release-versions.md)。

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### 升級 Kubernetes 運算子
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

若要升級 Flink 運算子的版本，請遵循下列步驟：

1. 解除安裝舊的 `flink-kubernetes-operator`：`helm uninstall flink-kubernetes-operator -n <NAMESPACE>`。

1. 刪除 CRD （因為 helm 不會自動刪除舊 CRD)：`kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`。

1. `flink-kubernetes-operator` 使用較新版本重新安裝 。

# 執行 Flink 應用程式
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

使用 Amazon EMR 6.13.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。使用 Amazon EMR 6.15.0 及更高版本時，您也可以在工作階段模式下執行 Flink 應用程式。本頁面介紹了可用於透過 Amazon EMR on EKS 執行 Flink 應用程式的兩種方法。

**Topics**

**注意**  
提交 Flink 作業時，必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能，可以停用它。依預設會啟用此功能。

**必要條件**：在使用 Flink Kubernetes Operator 執行 Flink 應用程式之前，請完成 [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 和 [安裝 Kubernetes 運算子](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) 中的步驟。

------
#### [ Application mode ]

使用 Amazon EMR 6.13.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。

1. 建立`FlinkDeployment`定義檔案，`basic-example-app-cluster.yaml`如下列範例所示。如果您啟用並使用其中一個[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)，請務必取消註解並設定組態 `fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用下列命令提交 Flink 部署。這也將建立名為 `basic-example-app-cluster` 的 `FlinkDeployment` 物件。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. 存取 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. 開啟 `localhost:8081` 以在本機檢視 Flink 作業。

1. 清除作業。請記得清除為此作業建立的 S3 成品，例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。

如需有關透過 Flink Kubernetes Operator 提交應用程式至 Flink 的詳細資訊，請參閱 GitHub 上 `apache/flink-kubernetes-operator` 資料夾中的 [Flink Kubernetes Operator 範例](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)。

------
#### [ Session mode ]

使用 Amazon EMR 6.15.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在工作階段模式下執行 Flink 應用程式。

1. 在下列範例中建立名為 `basic-example-app-cluster.yaml` `FlinkDeployment`的定義檔案。如果您啟用並使用其中一個[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)，請務必取消註解並設定組態 `fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用下列命令提交 Flink 部署。這也將建立名為 `basic-example-session-cluster` 的 `FlinkDeployment` 物件。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 使用下列命令確認工作階段叢集 `LIFECYCLE` 為 `STABLE`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   輸出應類似以下範例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 使用以下範例內容，建立 `FlinkSessionJob` 自訂定義資源檔案 `basic-session-job.yaml`：

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 使用下列命令提交 Flink 工作階段作業。這將建立 `FlinkSessionJob` 物件 `basic-session-job`。

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 使用下列命令確認工作階段叢集 `LIFECYCLE` 為 `STABLE`，且 `JOB STATUS` 為 `RUNNING`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   輸出應類似以下範例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. 存取 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. 開啟 `localhost:8081` 以在本機檢視 Flink 作業。

1. 清除作業。請記得清除為此作業建立的 S3 成品，例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。

------

# 執行 Flink 應用程式的安全角色許可
<a name="jobruns-flink-kubernetes-security"></a>

本主題說明部署和執行 Flink 應用程式的安全角色。管理部署以及建立和管理任務、操作員角色和任務角色需要兩個角色。本主題會介紹他們並列出他們的許可。

## 角色型存取控制
<a name="jobruns-flink-kubernetes-security-rbac"></a>

若要部署 Operator 並執行 Flink 作業，必須建立兩個 Kubernetes 角色：一個 Operator 和一個作業角色。Amazon EMR 會在您安裝 Operator 時預設建立兩個角色。

## Operator 角色
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

我們使用 Operator 角色來管理 `flinkdeployments`，以便為每個 Flink 作業和其他資源 (如服務) 建立和管理 JobManager。

Operator 角色的預設名稱為 `emr-containers-sa-flink-operator` 且需要下列許可。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## 作業角色
<a name="jobruns-flink-security-job-role"></a>

JobManager 使用作業角色來建立和管理每個作業的 TaskManagers 和 ConfigMaps。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# 針對 Amazon EMR on EKS 解除安裝 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

請依照下列步驟解除安裝 Flink Kubernetes Operator。

1. 刪除 Operator。

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. 刪除 Helm 不會解除安裝的 Kubernetes 資源。

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (選用) 刪除 cert-manager。

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```