

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 运行 Flink 应用程序
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

在 Amazon EMR 6.13.0 及更高版本中，您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。在 Amazon EMR 6.15.0 及更高版本中，您还可以在会话模式中运行 Flink 应用程序。本页介绍您使用 EKS 上的 Amazon EMR 运行 Flink 应用程序时可用的两种方法。

**Topics**

**注意**  
提交 Flink 作业时，必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能，可以将其禁用。系统会默认启用该功能。

**先决条件**：在使用 Flink Native Kubernetes Operator 运行 Flink 应用程序之前，请先完成 [设置 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 和 [安装 Kubernetes Operator](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) 中的步骤。

------
#### [ Application mode ]

在 Amazon EMR 6.13.0 及更高版本中，您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

1. 创建一个名为 `basic-example-app-cluster.yaml` 的 `FlinkDeployment` 定义文件，如下例所示。如果您已激活并使用其中一个[选项 AWS 区域，请](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)务必取消注释并配置配置。`fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 `basic-example-app-cluster` 的 `FlinkDeployment` 对象。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. 访问 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. 打开 `localhost:8081` 即可在本地查看 Flink 任务。

1. 清理任务。记得清理为此任务创建的 S3 项目，例如检查点、高可用性、保存点元数据和日志。 CloudWatch

有关通过 Flink Kubernetes 运算符向 Flink 提交应用程序的更多信息，请参阅文件夹中的 Flink K [uber](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) netes 运算符示例。`apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

在 Amazon EMR 6.15.0 及更高版本中，您可以在会话模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

1. 创建一个名为 `basic-example-app-cluster.yaml` 的 `FlinkDeployment` 定义文件，如下例所示。如果您已激活并使用其中一个[选项 AWS 区域，请](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)务必取消注释并配置配置。`fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 `basic-example-session-cluster` 的 `FlinkDeployment` 对象。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 使用以下命令确认会话集群 `LIFECYCLE` 是 `STABLE`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   该输出应该类似于以下示例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 使用以下示例内容创建 `FlinkSessionJob` 自定义定义资源文件 `basic-session-job.yaml`：

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 使用如下命令提交 Flink 会话作业。此操作将会创建一个 `FlinkSessionJob` 对象 `basic-session-job`。

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 使用以下命令确认会话集群 `LIFECYCLE` 是 `STABLE`，`JOB STATUS` 是 `RUNNING`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   该输出应该类似于以下示例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. 访问 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. 打开 `localhost:8081` 即可在本地查看 Flink 任务。

1. 清理任务。记得清理为此任务创建的 S3 项目，例如检查点、高可用性、保存点元数据和日志。 CloudWatch

------