

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 執行 Flink 應用程式
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

使用 Amazon EMR 6.13.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。使用 Amazon EMR 6.15.0 及更高版本時，您也可以在工作階段模式下執行 Flink 應用程式。本頁面介紹了可用於透過 Amazon EMR on EKS 執行 Flink 應用程式的兩種方法。

**Topics**

**注意**  
提交 Flink 作業時，必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能，可以停用它。依預設會啟用此功能。

**必要條件**：在使用 Flink Kubernetes Operator 執行 Flink 應用程式之前，請完成 [針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 和 [安裝 Kubernetes 運算子](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) 中的步驟。

------
#### [ Application mode ]

使用 Amazon EMR 6.13.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。

1. 建立`FlinkDeployment`定義檔案，`basic-example-app-cluster.yaml`如下列範例所示。如果您啟用並使用其中一個[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)，請務必取消註解並設定組態 `fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用下列命令提交 Flink 部署。這也將建立名為 `basic-example-app-cluster` 的 `FlinkDeployment` 物件。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. 存取 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. 開啟 `localhost:8081` 以在本機檢視 Flink 作業。

1. 清除作業。請記得清除為此作業建立的 S3 成品，例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。

如需有關透過 Flink Kubernetes Operator 提交應用程式至 Flink 的詳細資訊，請參閱 GitHub 上 `apache/flink-kubernetes-operator` 資料夾中的 [Flink Kubernetes Operator 範例](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)。

------
#### [ Session mode ]

使用 Amazon EMR 6.15.0 及更高版本時，您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在工作階段模式下執行 Flink 應用程式。

1. 在下列範例中建立名為 `basic-example-app-cluster.yaml` `FlinkDeployment`的定義檔案。如果您啟用並使用其中一個[選擇加入 AWS 區域](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)，請務必取消註解並設定組態 `fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用下列命令提交 Flink 部署。這也將建立名為 `basic-example-session-cluster` 的 `FlinkDeployment` 物件。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 使用下列命令確認工作階段叢集 `LIFECYCLE` 為 `STABLE`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   輸出應類似以下範例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 使用以下範例內容，建立 `FlinkSessionJob` 自訂定義資源檔案 `basic-session-job.yaml`：

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 使用下列命令提交 Flink 工作階段作業。這將建立 `FlinkSessionJob` 物件 `basic-session-job`。

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 使用下列命令確認工作階段叢集 `LIFECYCLE` 為 `STABLE`，且 `JOB STATUS` 為 `RUNNING`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   輸出應類似以下範例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. 存取 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. 開啟 `localhost:8081` 以在本機檢視 Flink 作業。

1. 清除作業。請記得清除為此作業建立的 S3 成品，例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。

------