

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon EMR on EKS 运行 Flink 任务
<a name="run-flink-jobs"></a>

Amazon EMR 版本 6.13.0 及更高版本支持带 Apache Flink 的 Amazon EMR on EKS 或 Flink Kubernetes 运算符作为 Amazon EMR on EKS 的任务提交模型。借助带 Apache Flink 的 Amazon EMR on EKS，您可以在自己的 Amazon EKS 集群上使用 Amazon EMR 发行版运行时系统来部署和管理 Flink 应用程序。在 Amazon EKS 集群中部署 Flink Kubernetes 运算符后，您可以直接向此运算符提交 Flink 应用程序。运算符管理 Flink 应用程序的生命周期。

**Topics**
+ [设置和使用 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator.md)
+ [使用 Flink Native Kubernetes](jobruns-flink-native-kubernetes.md)
+ [为 Flink 和 FluentD 自定义 Docker 映像](jobruns-flink-docker-flink-fluentd.md)
+ [监视 Flink Kubernetes Operator 和 Flink 任务](jobruns-flink-monitoring.md)
+ [Flink 如何支持高可用性和作业弹性](jobruns-flink-resiliency.md)
+ [在 Flink 应用程序中使用 Autoscaler](jobruns-flink-autoscaler.md)
+ [Amazon EMR on EKS 上 Flink 作业的维护和故障排除](jobruns-flink-troubleshooting.md)
+ [支持将 Amazon EMR on EKS 与 Apache Flink 结合使用的发行版](jobruns-flink-security-release-versions.md)

# 设置和使用 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator"></a>

以下页面旨在介绍如何在 Amazon EMR on EKS 上设置并使用 Flink Kubernetes Operator 来运行 Flink 任务。提供的主题包括所需的先决条件、如何设置环境以及在 Amazon EMR on EKS 上运行 Flink 应用程序。

**Topics**
+ [设置 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md)
+ [安装 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-getting-started.md)
+ [运行 Flink 应用程序](jobruns-flink-kubernetes-operator-run-application.md)
+ [运行 Flink 应用程序的安全角色权限](jobruns-flink-kubernetes-security.md)
+ [卸载 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-uninstall.md)

# 设置 Amazon EMR on EKS 的 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-setup"></a>

在 Amazon EKS 上安装 Flink Kubernetes Operator 之前，请执行下述任务来完成设置。如果已注册 Amazon Web Services（AWS）并且一直在使用 Amazon EKS，您基本上就准备好使用 Amazon EMR on EKS 了。完成以下任务，在 Amazon EKS 上设置好 Flink Operator。跳过已完成的先决条件，转到下一个先决条件。
+ **[安装或更新到最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** — 如果您已经安装了 AWS CLI，请确认您安装的是最新版本。
+ **[设置 kubectl 和 eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)**：eksctl 是用来与 Amazon EKS 通信的命令行工具。
+ **[Install Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)**：Kubernetes 的 Helm 包管理器可帮助您在 Kubernetes 集群上安装和管理应用程序。
+ **[开始使用 Amazon EKS - eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html)**：按照以下步骤在 Amazon EKS 中创建带有节点的新 Kubernetes 集群。
+ **[选择 Amazon EMR 发行版标签](jobruns-flink-security-release-versions.md)（6.13.0 或更高版本）**：Amazon EMR 6.13.0 及更高版本支持 Flink Kubernetes Operator。
+ **[在 Amazon EKS 集群上启用服务账户的 IAM 角色（IRSA）。](setting-up-enable-IAM.md)**
+ **[创建任务执行角色](creating-job-execution-role.md)**。
+ **[更新任务执行角色的信任策略。](setting-up-trust-policy.md)**
+ 创建 Operator 执行角色。此为可选步骤。Flink 任务和 Operator 可以使用相同的角色。如果想为 Operator 设置不同的 IAM 角色，可以单独创建一个角色。
+ 更新 Operator 执行角色的信任策略。必须为要用于 Amazon EMR Flink Kubernetes Operator 服务账户的角色显式添加一个信任策略条目。可以按照如下示例格式进行操作：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# 安装 Amazon EMR on EKS 的 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

本主题通过准备 Flink 部署，帮助您开始在 Amazon EKS 上使用 Flink Kubernetes Operator。

## 安装 Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

按照以下步骤安装 Apache Flink 版 Kubernetes Operator。

1. 如果尚未执行此操作，请完成 [设置 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 中的步骤。

1. 安装*cert-manager*（每个 Amazon EKS 集群安装一次）以启用添加 webhook 组件。

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. 安装 Helm 图表。

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   输出示例：

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. 等待部署完成并验证图表安装情况。

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. 部署完成后，您应该会看到如下消息。

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. 使用以下命令查看部署的 Operator。

   ```
   helm list --namespace $NAMESPACE
   ```

   示例输出如下，其中应用程序版本 `x.y.z-amzn-n` 将与您的 Amazon EMR on EKS 发行版的 Flink Operator 版本对应。有关更多信息，请参阅 [支持将 Amazon EMR on EKS 与 Apache Flink 结合使用的发行版](jobruns-flink-security-release-versions.md)。

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### 升级 Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

要升级 Kubernetes Operator 的版本，请按照以下步骤进行操作：

1. 卸载旧的 `flink-kubernetes-operator`：`helm uninstall flink-kubernetes-operator -n <NAMESPACE>`。

1. 删除 CRD（因为 Helm 不会自动删除旧的 CRD）：`kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`。

1. 使用较新的版本重新安装 `flink-kubernetes-operator`。

# 运行 Flink 应用程序
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

在 Amazon EMR 6.13.0 及更高版本中，您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。在 Amazon EMR 6.15.0 及更高版本中，您还可以在会话模式中运行 Flink 应用程序。本页介绍您使用 EKS 上的 Amazon EMR 运行 Flink 应用程序时可用的两种方法。

**Topics**

**注意**  
提交 Flink 作业时，必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能，可以将其禁用。系统会默认启用该功能。

**先决条件**：在使用 Flink Native Kubernetes Operator 运行 Flink 应用程序之前，请先完成 [设置 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md) 和 [安装 Kubernetes Operator](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) 中的步骤。

------
#### [ Application mode ]

在 Amazon EMR 6.13.0 及更高版本中，您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

1. 创建一个名为 `basic-example-app-cluster.yaml` 的 `FlinkDeployment` 定义文件，如下例所示。如果您已激活并使用其中一个[选项 AWS 区域，请](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)务必取消注释并配置配置。`fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 `basic-example-app-cluster` 的 `FlinkDeployment` 对象。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. 访问 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. 打开 `localhost:8081` 即可在本地查看 Flink 任务。

1. 清理任务。记得清理为此任务创建的 S3 项目，例如检查点、高可用性、保存点元数据和日志。 CloudWatch

有关通过 Flink Kubernetes 运算符向 Flink 提交应用程序的更多信息，请参阅文件夹中的 Flink K [uber](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) netes 运算符示例。`apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

在 Amazon EMR 6.15.0 及更高版本中，您可以在会话模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

1. 创建一个名为 `basic-example-app-cluster.yaml` 的 `FlinkDeployment` 定义文件，如下例所示。如果您已激活并使用其中一个[选项 AWS 区域，请](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)务必取消注释并配置配置。`fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 `basic-example-session-cluster` 的 `FlinkDeployment` 对象。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 使用以下命令确认会话集群 `LIFECYCLE` 是 `STABLE`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   该输出应该类似于以下示例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 使用以下示例内容创建 `FlinkSessionJob` 自定义定义资源文件 `basic-session-job.yaml`：

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 使用如下命令提交 Flink 会话作业。此操作将会创建一个 `FlinkSessionJob` 对象 `basic-session-job`。

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 使用以下命令确认会话集群 `LIFECYCLE` 是 `STABLE`，`JOB STATUS` 是 `RUNNING`：

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   该输出应该类似于以下示例：

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. 访问 Flink UI。

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. 打开 `localhost:8081` 即可在本地查看 Flink 任务。

1. 清理任务。记得清理为此任务创建的 S3 项目，例如检查点、高可用性、保存点元数据和日志。 CloudWatch

------

# 运行 Flink 应用程序的安全角色权限
<a name="jobruns-flink-kubernetes-security"></a>

本主题介绍了部署和运行 Flink 应用程序的安全角色。管理部署以及创建和管理作业需要两个角色：Operator 和 Job。本主题介绍了两个角色并列出了其权限。

## 基于角色的访问控制
<a name="jobruns-flink-kubernetes-security-rbac"></a>

要部署 Operator 并运行 Flink 任务，必须创建两个 Kubernetes 角色：一个 Operator 角色和一个任务角色。安装 Operator 时 Amazon EMR 会默认创建这两个角色。

## Operator 角色
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

我们使用操作员角色`flinkdeployments`来管理每个 Flink 作业和其他资源（例如服务）的创建和管理。 JobManager 

Operator 角色的默认名称为 `emr-containers-sa-flink-operator` 并且需要以下权限。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## 任务角色
<a name="jobruns-flink-security-job-role"></a>

 JobManager 使用作业角色 ConfigMaps 为每个作业创建和管理 TaskManagers 和。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# 卸载 Amazon EMR on EKS 的 Flink Kubernetes Operator
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

按照如下步骤卸载 Flink Kubernetes Operator。

1. 删除 Operator。

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. 删除 Helm 未卸载的 Kubernetes 资源。

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. （可选）删除 cert-manager。

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# 使用 Flink Native Kubernetes
<a name="jobruns-flink-native-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本都支持 Flink Native Kubernetes 作为命令行工具，而您可以使用该工具向 Amazon EMR on EKS 集群提交和执行 Flink 应用程序。

**Topics**
+ [设置 Amazon EMR on EKS 的 Flink Native Kubernetes](jobruns-flink-native-kubernetes-setup.md)
+ [Amazon EMR on EKS 的 Flink Native Kubernetes 入门](jobruns-flink-native-kubernetes-getting-started.md)
+ [原生 Kubernetes 的 Flink JobManager 服务账号安全要求](jobruns-flink-native-kubernetes-security-requirements.md)

# 设置 Amazon EMR on EKS 的 Flink Native Kubernetes
<a name="jobruns-flink-native-kubernetes-setup"></a>

先执行以下任务来完成设置，才能在 Amazon EMR on EKS 上使用 Flink CLI 运行应用程序。如果已注册 Amazon Web Services（AWS）并且一直在使用 Amazon EKS，您基本上就准备好使用 Amazon EMR on EKS 了。跳过已完成的先决条件，转到下一个先决条件。
+ **[安装或更新到最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** — 如果您已经安装了 AWS CLI，请确认您安装的是最新版本。
+ **[开始使用 Amazon EKS - eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html)**：按照以下步骤在 Amazon EKS 中创建带有节点的新 Kubernetes 集群。
+ **[选择 Amazon EMR 基础映像 URI](docker-custom-images-tag.md)（6.13.0 或更高版本）**：Amazon EMR 6.13.0 及更高版本都支持 Flink Kubernetes 命令。
+ 确认 JobManager 服务帐号具有创建和监视 TaskManager pod 的相应权限。有关更多信息，请参阅[原生 Kubernetes 的 Flink JobManager 服务账号安全要求](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html)。
+ 设置本地 [AWS 凭证配置文件](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)。
+ [为 Amazon EKS 集群创建或更新 kubeconfig 文件](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html)，如果要在该集群上运行 Flink 应用程序。

# Amazon EMR on EKS 的 Flink Native Kubernetes 入门
<a name="jobruns-flink-native-kubernetes-getting-started"></a>

这些步骤展示了如何配置和运行 Flink 应用程序并为其设置服务账户。Flink Native Kubernetes 用于在运行的 Kubernetes 集群上部署 Flink。

## 配置和运行 Flink 应用程序
<a name="jobruns-flink-native-kubernetes-getting-started-run-application"></a>

Amazon EMR 6.13.0 及更高版本都支持 Flink Native Kubernetes 在 Amazon EKS 集群上运行 Flink 应用程序。要运行 Flink 应用程序，请按照下述步骤操作：

1. 在使用 Flink Native Kubernetes 命令运行 Flink 应用程序之前，请先完成 [设置 Amazon EMR on EKS 的 Flink Native Kubernetes](jobruns-flink-native-kubernetes-setup.md) 中的步骤。

1. [下载并安装 Flink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation)。

1. 设置以下环境变量的值。

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-AWS 区域-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. 创建服务账户来管理 Kubernetes 资源。

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. 运行 `run-application` CLI 命令。

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. 检查创建好的 Kubernetes 资源。

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. 端口转发到 8081。

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. 在本地访问 Flink UI。  
![\[访问 Flink UI。\]](http://docs.aws.amazon.com/zh_cn/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. 删除 Flink 应用程序。

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

有关向 Flink 提交应用程序的更多信息，请参阅 Apache Flink 文档中的 [Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/)。

# 原生 Kubernetes 的 Flink JobManager 服务账号安全要求
<a name="jobruns-flink-native-kubernetes-security-requirements"></a>

Flink JobManager 容器使用 Kubernetes 服务账号访问 Kubernetes API 服务器来创建和监视 Pod。 TaskManager JobManager 服务帐号必须拥有对 create/delete TaskManager pod 的相应权限，并允许 to watch 领导者 ConfigMaps 检索集群 ResourceManager 中 JobManager 和的地址。 TaskManager 

以下规则适用于此服务账户。

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# 为 Flink 和 FluentD 自定义 Docker 映像
<a name="jobruns-flink-docker-flink-fluentd"></a>

按照以下步骤，使用 Apache Flink 或 FluentD 映像为 Amazon EMR on EKS 自定义 Docker 映像。其中包括获取基础映像、自定义基础映像、发布基础映像和提交工作负载的技术指导。

**Topics**
+ [先决条件](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [步骤 1：从 Amazon Elastic Container Registry 中检索基础映像](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [步骤 2：自定义基础镜像](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [步骤 3：发布自定义映像](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [步骤 4：使用自定义映像在 Amazon EMR 中提交 Flink 工作负载](#jobruns-flink-docker-flink-fluentd-submit-workload)

## 先决条件
<a name="jobruns-flink-docker-flink-fluentd-prereqs"></a>

在自定义 Docker 映像之前，确保您已完成以下先决条件：
+ 已完成[设置 Amazon EMR on EKS 的 Flink Kubernetes Operator](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html) 步骤。
+ 已在环境中安装 Docker。有关更多信息，请参阅[获取 Docker](https://docs.docker.com/get-docker/)。

## 步骤 1：从 Amazon Elastic Container Registry 中检索基础映像
<a name="jobruns-flink-docker-flink-fluentd-retrieve-base"></a>

基础映像包含访问其他 AWS 服务所需的 Amazon EMR 运行时和连接器。如果在 Flink 6.14.0 或更高版本中使用 Amazon EMR on EKS，可从 Amazon ECR 公开映像浏览馆获取基础映像。浏览图库以找到映像链接，然后将映像拉到本地 Workspace。例如，对于 Amazon EMR 6.14.0 发行版，以下 `docker pull` 命令将返回最新的标准基础映像。将 `emr-6.14.0:latest` 替换为所需的发行版。

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

以下是 Flink 库映像和 Fluentd 库映像的链接：
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fluentd/emr-6.14.0 (](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## 步骤 2：自定义基础镜像
<a name="jobruns-flink-docker-flink-fluentd-customize-image"></a>

下列步骤描述了如何自定义从 Amazon ECR 中拉取的基础映像。

1. 在您的本地 Workspace 上创建新的 `Dockerfile`。

1. 编辑 `Dockerfile` 并添加以下内容。该 `Dockerfile` 使用您从 `public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest` 中拉取的容器映像。

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   如果您使用的是 `Fluentd`，请使用以下配置。

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. 将命令添加到 `Dockerfile` 以自定义基础镜像。以下命令演示了如何安装 Python 库。

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. 在创建 `DockerFile` 的同一目录中，运行以下命令构建 Docker 映像。您在 `-t` 标志后提供的字段是您自定义的映像名称。

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## 步骤 3：发布自定义映像
<a name="jobruns-flink-docker-flink-fluentd-publish-image"></a>

现在，将新的 Docker 映像发布到您的 Amazon ECR 注册表。

1. 运行以下命令创建一个 Amazon ECR 存储库，来存储您的 Docker 映像。为存储库命名，例如 `emr_custom_repo.`。有关更多信息，请参阅《Amazon Elastic Container Registry 用户指南》中的[创建存储库](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository)。

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. 运行以下命令对您的默认注册表进行身份验证。有关更多信息，请参阅《Amazon Elastic Container Registry 用户指南》中的[对您的默认注册表进行身份验证](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry)。

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. 推送镜像。有关更多信息，请参阅《Amazon Elastic Container Registry 用户指南》中的[将映像推送到 Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image)。

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## 步骤 4：使用自定义映像在 Amazon EMR 中提交 Flink 工作负载
<a name="jobruns-flink-docker-flink-fluentd-submit-workload"></a>

要使用自定映像，请对 `FlinkDeployment` 规范进行以下更改。为此，请在部署规范的 `spec.image` 行中输入您的映像。

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

要在 Fluentd 作业中使用自定义映像，请在部署规范的 `monitoringConfiguration.image` 行中输入您的映像。

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# 监视 Flink Kubernetes Operator 和 Flink 任务
<a name="jobruns-flink-monitoring"></a>

本节旨在介绍在 Amazon EMR on EKS 上监控 Flink 任务的几种方法。其中包括将 Flink 与 Amazon Managed Service for Prometheus 集成，使用提供作业状态和指标的 *Flink Web 控制面板*，或者使用监控配置将日志数据发送到 Amazon S3 和 Amazon CloudWatch。

**Topics**
+ [使用 Amazon Managed Service for Prometheus 监控 Flink 作业](jobruns-flink-monitoring-prometheus.md)
+ [使用 Flink UI 监控 Flink 作业](jobruns-flink-monitoring-ui.md)
+ [使用监控配置监控 Flink Kubernetes Operator 和 Flink 作业](jobruns-flink-monitoring-configuration.md)

# 使用 Amazon Managed Service for Prometheus 监控 Flink 作业
<a name="jobruns-flink-monitoring-prometheus"></a>

您可以将 Apache Flink 与 Amazon Managed Service for Prometheus（管理门户）集成。Amazon Managed Service for Prometheus 支持从在 Amazon EKS 上运行的集群中摄取 Amazon Managed Service for Prometheus 服务器的指标。Amazon Managed Service for Prometheus 可与已 Amazon EKS 集群上运行的 Prometheus 服务器配合使用。运行集成了 Amazon EMR Flink Operator 的 Amazon Managed Service for Prometheus 会自动部署并配置 Prometheus 服务器，使其与 Amazon Managed Service for Prometheus 集成。

1. [创建 Amazon Managed Service for Prometheus Workspace](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html)。此工作空间用作摄取端点。稍后需要远程写入 URL。

1. 设置服务账户的 IAM 角色。

   要使用这种入门方法，请使用运行 Prometheus 服务器的 Amazon EKS 集群中服务账户的 IAM 角色。此类角色又称为*服务角色*。

   如果尚无此类角色，请[设置服务角色从 Amazon EKS 集群中摄取指标](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html)。

   请创建一个名为 `amp-iamproxy-ingest-role` 的 IAM 角色，然后再继续操作。

1. 使用 Amazon Managed Service for Prometheus 安装 Amazon EMR Flink Operator。

您已经拥有了一个 Amazon Managed Service for Prometheus Workspace，一个专用于 Amazon Managed Service for Prometheus 的 IAM 角色，以及所需的权限，现在可以安装 Amazon EMR Flink 运算符。

创建 `enable-amp.yaml` 文件。此文件允许您使用自定义配置来覆盖 Amazon Managed Service for Prometheus 的设置。务必使用您自己的角色。

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

使用 [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) 命令将覆盖传递给 `flink-kubernetes-operator` 图表。

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

此命令会自动在端口 9999 的 Operator 上安装 Prometheus 报告程序。未来的任何 `FlinkDeployment` 也会在 9249 上公开一个 `metrics` 端口。
+ Flink Operator 指标会出现在 Prometheus 中的 `flink_k8soperator_` 标签下。
+ Flink Task Manager 指标会出现在 Prometheus 的 `flink_taskmanager_` 标签下。
+ Flink Job Manager 指标会出现在 Prometheus 的 `flink_jobmanager_` 标签下。

# 使用 Flink UI 监控 Flink 作业
<a name="jobruns-flink-monitoring-ui"></a>

要监控正在运行的 Flink 应用程序的运行状况和性能，请使用 *Flink Web 控制面板*。此仪表板提供有关任务状态 TaskManagers、数量以及该任务的指标和日志的信息。借助它还可以查看并修改 Flink 作业配置，以及通过提交或取消作业与 Flink 集群进行交互。

要访问 Kubernetes 上正在运行的 Flink 应用程序的 Flink Web 控制面板，请按照以下步骤操作：

1. 使用`kubectl port-forward`命令将本地端口转发到 Flink 应用程序容器中运行 Flink Web 控制面板的 TaskManager 端口。默认情况下，此端口为 8081。*deployment-name*替换为上面的 Flink 应用程序部署名称。

   ```
   kubectl get deployments -n namespace
   ```

   输出示例：

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. 如果要在本地使用其他端口，请使用:8081 *local-port* 参数。

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. 在 Web 浏览器中导航到 `http://localhost:8081`（如果使用的是自定义本地端口，则导航到 `http://localhost:local-port`）来访问 Flink Web 控制面板。此仪表板显示有关正在运行的 Flink 应用程序的信息，例如作业的状态 TaskManagers、数量以及该作业的指标和日志。  
![\[Flink 控制面板用户界面示例\]](http://docs.aws.amazon.com/zh_cn/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# 使用监控配置监控 Flink Kubernetes Operator 和 Flink 作业
<a name="jobruns-flink-monitoring-configuration"></a>

监控配置使您可以轻松地将 Flink 应用程序和操作员日志的日志存档设置为 S3 and/or CloudWatch （您可以选择其中一个或两个）。这样做会向 JobManager 你的 TaskManager 和容器添加一个 FluentD 边车，然后将这些组件的日志转发到你配置的接收器。

**注意**  
必须为 Flink Operator 和 Flink 任务（服务账户）设置服务账户的 IAM 角色，才能使用此功能，因为它需要与其他 AWS 服务进行交互。您必须按照 [设置 Amazon EMR on EKS 的 Flink Kubernetes Operator](jobruns-flink-kubernetes-operator-setup.md)，使用 IRSA 进行设置。

## Flink 应用程序日志
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

您可以通过下列方式来定义此配置。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

配置选项如下。
+ `s3MonitoringConfiguration`：用来设置转发到 S3 的配置密钥
  + `logUri`（必需）：用来存储日志的 S3 存储桶路径。
  + 上传日志后，S3 中的路径如下所示。
    + 未启用日志轮换：

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + 已启用日志轮换。您可以同时使用轮换文件和当前文件（不带日期戳的文件）。

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      以下格式是递增数字。

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + 使用此转发器需要以下 IAM 权限。

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration`— 用于设置转发的配置密钥 CloudWatch。
  + `logGroupName`（必填）— 要向其发送 CloudWatch 日志的日志组的名称（如果该组不存在，则自动创建该组）。
  + `logStreamNamePrefix`（可选）– 要向其发送日志的日志流的名称。默认值是空字符串。格式如下所示：

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + 使用此转发器需要以下 IAM 权限。

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources`（可选）：用于在启动的 Fluentbit Sidecar 容器上设置资源限制的配置密钥。
  + `memoryLimit`（可选）：默认值为 512Mi。根据自身需求进行调整。
  + `cpuLimit`（可选）：此选项没有默认值。根据自身需求进行调整。
+ `containerLogRotationConfiguration`（可选）：控制容器日志的轮换行为。该功能默认已启用。
  + `rotationSize`（必需）：指定日志轮换的文件大小。可行值的范围从 2KB 到 2GB 不等。rotationSize 参数的数字单位部分以整数形式传递。由于不支持十进制值，您可以指定 1.5GB 的轮换大小，例如值 1500MB。默认值为 2GB。
  + `maxFilesToKeep`（必需）：指定轮换后要在容器中保留的最大文件数。最小值为 1，最大值为 50。默认值为 10。

## Flink Operator 日志
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

我们还可以使用 Helm 图表安装 `values.yaml` 文件中的以下选项，为 Operator 启用日志存档。您可以启用 S3 CloudWatch、或两者兼而有之。

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

`monitoringConfiguration` 下的可用配置选项如下。
+ `s3MonitoringConfiguration`：设置此选项以存档到 S3。
+ `logUri`（必填项）：用来存储日志的 S3 存储桶路径。
+ 以下是上传日志后 S3 存储桶路径的示例格式。
  + 未启用日志轮换。

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + 已启用日志轮换。您可以同时使用轮换文件和当前文件（不带日期戳的文件）。

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    以下格式索引是递增数字。

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration`— 用于设置转发的配置密钥 CloudWatch。
  + `logGroupName`（必填）— 您要向其发送 CloudWatch 日志的日志组的名称。如果日志组不存在，则会自动创建。
  + `logStreamNamePrefix`（可选）：要向其发送日志的日志流的名称。默认值是空字符串。中的格式 CloudWatch 如下：

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources`（可选）：用于在启动的 Fluentbit Sidecar 容器上设置资源限制的配置密钥。
  + `memoryLimit`（可选）：内存限制。根据自身需求进行调整。默认值为 512Mi。
  + `cpuLimit`：CPU 限制。根据自身需求进行调整。无默认值。
+ `containerLogRotationConfiguration`（可选）：控制容器日志的轮换行为。该功能默认已启用。
  + `rotationSize`（必需）：指定日志轮换的文件大小。可行值的范围从 2KB 到 2GB 不等。rotationSize 参数的数字单位部分以整数形式传递。由于不支持十进制值，您可以指定 1.5GB 的轮换大小，例如值 1500MB。默认值为 2GB。
  + `maxFilesToKeep`（必需）：指定轮换后要在容器中保留的最大文件数。最小值为 1，最大值为 50。默认值为 10。

# Flink 如何支持高可用性和作业弹性
<a name="jobruns-flink-resiliency"></a>

以下部分概述了 Flink 如何使作业更加可靠和高度可用。Flink 通过内置的高可用性功能和各种故障恢复功能来实现这一点。

**Topics**
+ [为 Flink Operator 和 Flink 应用程序使用高可用性（HA）](jobruns-flink-using-ha.md)
+ [使用 EKS 上的 Amazon EMR 优化 Flink 任务重启时间以进行任务恢复和扩展操作](jobruns-flink-restart.md)
+ [使用 EKS 上的 Amazon EMR 上的 Flink 正常停用竞价型实例](jobruns-flink-decommission.md)

# 为 Flink Operator 和 Flink 应用程序使用高可用性（HA）
<a name="jobruns-flink-using-ha"></a>

本主题介绍了如何配置高可用性以及如何在几种不同的用例中使用。包括在使用 Job Manager 和 Flink Native Kubernetes 时。

## Flink Operator 高可用性
<a name="jobruns-flink-ha-operator"></a>

我们为 Flink Operator 启用了*高可用性*，这样就可以使用备用 Flink Operator 进行故障转移，从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”，启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 `values.yaml` 文件中配置副本字段。

以下字段支持自定义：
+ `replicas`（可选，默认值为 2）：将此数字设置为大于 1 会创建其他备用 Operator，从而更快地恢复任务。
+ `highAvailabilityEnabled`（可选，默认值为 true）：控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持，并设置正确的 `flink-conf.yaml` 参数。

在 `values.yaml` 文件中设置以下配置可以为 Operator 禁用 HA。

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**多可用区部署**

我们在多个可用区中创 Operator Pod。这是一个软约束，如果不同可用区中没有足够的资源，您的 Operator Pod 将被调度到同一可用区中。

**确定主副本**

 如果启用了 HA，则副本使用租约来确定哪个 JMs 是领导者，并使用 K8s Lease 进行领导者选举。您可以描述租约并查看 .Spec.Holder Identity 字段来确定当前主副本

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 交互**

**配置访问凭证**

请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。

**从 S3 应用程序模式获取任务 jar**

Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 JarurI 的 S3 位置即可。

您也可以使用此功能下载其他工件，例如 PyFlink 脚本。生成的 Python 脚本放在路径 `/opt/flink/usrlib/` 下。

以下示例演示了如何将此功能用于作 PyFlink 业。注意 jarURI 和 args 字段。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 连接器**

Flink 随附两个 S3 连接器（如下所示）。以下各节旨在介绍何时使用哪个连接器。

**检查点：Presto S3 连接器**
+ 将 S3 方案设置为 s3p://
+ 用于检查点到 s3 的推荐连接器。有关更多信息，请参阅 Apache 文档中[特定于 S3 的内容](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)。

示例 FlinkDeployment 规范：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**读取/写入 S3：Hadoop S3 连接器**
+ 将 S3 方案设置为 `s3://` 或 `s3a://`
+ 用于从 S3 读取和写入文件的推荐连接器（仅限实现 [Flinks Filesystem 接口](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)的 S3 连接器）。
+ 默认在 `flink-conf.yaml` 文件中设置 `fs.s3a.aws.credentials.provider`，即 `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`。如果完全覆盖默认值 `flink-conf`，并且正在与 S3 进行交互，请务必使用此提供程序。

示例 FlinkDeployment 规范

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Flink Deployments 的高可用性 (HA) 允许任务继续取得进展，即使遇到暂时性错误并 JobManager 导致崩溃。任务将重新启动，但会从上次成功启用 HA 的检查点开始。如果未启用 HA，Kubernetes 将重启你的 JobManager，但你的作业将以全新的作业开始并失去其进度。配置 HA 后，我们可以让 Kubernetes 将 HA 元数据存储在永久存储中，以便在中出现暂时故障时参考， JobManager 然后从上次成功的检查点恢复我们的作业。

Flink 任务会默认启用 HA（副本计数设置为 2，这需要您提供 S3 存储位置来永久存储 HA 元数据）。

**HA 配置**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

以下是 Job Manager 中上述 HA 配置的描述（在 .spec.jobManager 下定义）：
+ `highAvailabilityEnabled`（可选，默认值为 true）：如果不想启用 HA，也不想使用提供的 HA 配置，请将其设置为 `false `。您仍然可以操作“replicas”字段来手动配置 HA。
+ `replicas`（可选，默认为 2）：将此数字设置为大于 1 会创建其他待机状态 JobManagers ，从而可以更快地恢复作业。如果禁用 HA，则必须将副本计数设置为 1，否则会不断收到验证错误（如果未启用 HA，则仅支持 1 个副本）。
+ `storageDir`（必需）：由于默认使用副本计数 2，我们必须提供永久 storageDir。目前，此字段仅接受 S3 路径作为存储位置。

**Pod 区域**

 如果您启用 HA，我们还会尝试将 Pod 放在同一个可用区中，这样可以提高性能（通过将 pod 放在同一个可用区中来减少网络延迟 AZs）。这是一个尽力而为的过程，即如果在调度了大多数 Pod 的可用区中没有足够的资源，那么剩余 Pod 仍会被调度，但最终可能会出现在该可用区之外的节点上。

**确定主副本**

如果启用了 HA，则副本使用租约来确定哪个 JMs 是领导者，并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本，可以查看 Configmap 的内容，在数据下查看密钥 `org.apache.flink.k8s.leader.restserver`，找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 作业 - 本机 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 及更高版本都支持 Flink 本机 Kubernetes 在 Amazon EKS 集群上以高可用性模式运行 Flink 应用程序。

**注意**  
提交 Flink 作业时，必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能，可以将其禁用。系统会默认启用该功能。

要开启 Flink 高可用性功能，请在[运行 `run-application` CLI 命令](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)时提供以下 Flink 参数。参数在示例的下方定义。

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**：您要存储作业的高可用性元数据的 Amazon S3 存储桶。

  **`Dkubernetes.jobmanager.replicas`**：要创建的 Job Manager 容器组（pod）的数量，以大于 `1` 的整数表示。

  **`Dkubernetes.cluster-id`**：标识 Flink 集群的唯一 ID。

# 使用 EKS 上的 Amazon EMR 优化 Flink 任务重启时间以进行任务恢复和扩展操作
<a name="jobruns-flink-restart"></a>

当任务失败或发生扩展操作时，Flink 会尝试从上一次完成的检查点重新执行任务。重启过程可能需要一分钟或更长时间才能执行，具体取决于检查点状态的大小以及并行任务的数量。重启期间，可以累积作业的积压任务。但是，Flink 可以通过一些方法来优化执行图的恢复和重启速度，从而提高作业稳定性。

本页介绍了 Amazon EMR Flink 对竞价型实例执行任务恢复或扩展操作过程中缩短作业重启时间的一些方法。竞价型实例是未使用的计算容量，以折扣价提供。该实例具有独特的行为，偶尔发生中断，因此了解 Amazon EMR on EKS 如何处理这些行为非常重要，包括 Amazon EMR on EKS 如何执行停用和作业重启。

**Topics**
+ [任务本地恢复](#flink-restart-task-local)
+ [通过 Amazon EBS 卷挂载实现的任务本地恢复](#flink-restart-task-local-ebs)
+ [基于日志的通用增量检查点](#flink-restart-log-check)
+ [精细恢复](#flink-restart-fine-grained)
+ [自适应计划程序中的组合重启机制](#flink-restart-combined)

## 任务本地恢复
<a name="flink-restart-task-local"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持任务本地恢复。

使用 Flink 检查点，每个任务都会生成其状态的快照，Flink 会将该快照写入分布式存储（如 Amazon S3）。在恢复的情况下，任务会从分布式存储中恢复其状态。分布式存储提供容错能力，并且可以在重新扩展期间重新分配状态，因为它可供所有节点访问。

但远程分布式存储也有一个缺点：所有任务都必须通过网络从远程位置读取其状态。在任务恢复或扩展操作期间，这可能会导致大规模状态的恢复时间很长。

通过*任务本地恢复*可以解决恢复时间长这一问题。任务将其在检查点上的状态写入任务本地的辅助存储（例如本地磁盘）。它们还将状态存储在主存储中，或者存储在 Amazon S3 中（在本例中）。恢复期间，计划程序将任务计划在任务之前运行所在的同一个任务管理器上，这样它们就可以从本地状态存储中恢复，而不是从远程状态存储中读取。有关更多信息，请参阅 *Apache Flink 文档*中的[任务本地恢复](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我们对示例作业进行的基准测试表明，启用任务本地恢复后，恢复时间已从几分钟缩短到几秒。

要启用任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## 通过 Amazon EBS 卷挂载实现的任务本地恢复
<a name="flink-restart-task-local-ebs"></a>

**注意**  
EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 支持 Amazon EBS 的任务本地恢复。

使用 EKS 上的 Amazon EMR 上的 Flink，您可以将 Amazon EBS 卷自动预调配到 TaskManager 容器组（pod）中以进行任务本地恢复。默认的叠加挂载随附 10GB 的卷，足以满足状态较低的作业。状态较大的作业可以启用*自动 EBS 卷挂载*选项。TaskManager 容器组（pod）是在创建容器组（pod）时自动创建和挂载的，在删除容器组（pod）时会被移除。

按照以下步骤为 EKS 上的 Amazon EMR 中的 Flink 启用自动 EBS 卷挂载：

1. 导出您以下变量的值，您将在接下来的步骤中使用它们。

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 为集群创建或更新 `kubeconfig` YAML 文件。

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. 在 Amazon EKS 集群上为 Amazon EBS Container Storage Interface（CSI）驱动程序创建 IAM 服务账户。

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 使用以下命令创建 Amazon EBS CSI 驱动程序：

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 使用以下命令创建 Amazon EBS 存储类：

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   然后应用该类：

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm 使用创建服务账户的选项安装 Amazon EMR Flink Kubernetes Operator。这将创建在 Flink 部署中使用的 `emr-containers-sa-flink`。

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. 要提交 Flink 作业并启用 EBS 卷的自动预调配以进行任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。调整作业状态大小的大小限制。将 `serviceAccount` 设置为 `emr-containers-sa-flink`。指定检查点间隔值，以毫秒为单位。并省略 `executionRoleArn`。

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

当您准备删除 Amazon EBS CSI 驱动程序插件时，请使用以下命令：

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 基于日志的通用增量检查点
<a name="flink-restart-log-check"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持基于日志的通用增量检查点。

Flink 1.16 中添加了基于日志的通用增量检查点功能，以提高检查点的速度。较快的检查点间隔通常会导致恢复工作减少，因为恢复后需要重新处理的事件较少。有关更多信息，请参阅 *Apache Flink 博客*上的[使用基于日志的通用增量检查点提高检查点的速度和稳定性](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

对于示例作业，我们的基准测试表明，使用基于日志的通用增量检查点时，检查点时间从几分钟缩短到几秒。

要启用基于日志的通用增量检查点，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精细恢复
<a name="flink-restart-fine-grained"></a>

**注意**  
EKS 上的 Amazon EMR 6.14.0 及更高版本上的 Flink 支持对默认计划程序的精细恢复支持。EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 提供自适应计划程序的精细恢复支持。

当任务在执行过程中失败时，Flink 会重置整个执行图，并从上次完成的检查点触发完整的重新执行。这比仅重新执行失败的任务更昂贵。精细恢复仅重新启动失败的任务与管道连接的组件。在以下示例中，作业图有 5 个顶点（`A` 到 `E`）。顶点之间的所有连接都使用逐点分布进行管道化处理，作业的 `parallelism.default` 设置为 `2`。

```
A → B → C → D → E
```

在本示例中，总共有 10 个任务在运行。第一个管道（`a1` 到 `e1`）在 TaskManager（`TM1`）上运行，第二个管道（`a2` 到 `e2`）在另一个 TaskManager（`TM2`）上运行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有两个管道连接的组件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 其中一个失败，则故障仅影响 TaskManager 正在其中运行的管道中的 5 个任务。重启策略仅会启动受影响的管道化组件。

精细恢复仅适用于完全并行的 Flink 作业。`keyBy()` 或 `redistribute()` 操作不支持。有关更多信息，请参阅 *Flink 改进提案* Jira 项目中的 [FLIP-1：从任务失败中进行精细恢复](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

要启用精细恢复，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 自适应计划程序中的组合重启机制
<a name="flink-restart-combined"></a>

**注意**  
EKS 上的 Amazon EMR 6.15.0 及更高版本上的 Flink 支持自适应计划程序中的组合重启机制。

自适应计划程序可以根据可用插槽调整作业的并行度。如果没有足够的插槽来适应配置的作业并行度，它将自动降低并行度。如果有新的插槽可用，则任务将再次纵向扩展到配置的作业并行度。当没有足够的可用资源时，自适应计划程序将避免作业停机。这是 Flink Autoscaler 支持的计划程序。出于这些原因，我们建议在 Amazon EMR Flink 中使用自适应计划程序。但是，自适应计划程序可能会在短时间内进行多次重启，每添加一个新资源就会重启一次。这可能导致作业性能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在自适应计划程序中具有组合重启机制，可在添加第一个资源时打开一个重启窗口，然后等到配置的默认 1 分钟窗口间隔时。当有足够的资源可用来运行具有配置并行性的作业时，或者当间隔超时时，它会执行一次重启。

对于示例作业，我们的基准测试表明，当您使用自适应计划程序和 Flink Autoscaler 时，此功能处理的记录比默认行为多 10%。

要启用组合重启机制，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# 使用 EKS 上的 Amazon EMR 上的 Flink 正常停用竞价型实例
<a name="jobruns-flink-decommission"></a>

使用 EKS 上的 Amazon EMR 的 Flink 可以在任务恢复或扩展操作期间缩短作业的重启时间。

## 概述
<a name="jobruns-flink-decommission-overview"></a>

EKS 上的 Amazon EMR 发行版 6.15.0 及更高版本支持在带有 Apache Flink 的 EKS 上的 Amazon EMR 中正常停用竞价型实例上的任务管理器。作为此功能的一部分，带有 Flink 的 EKS 上的 Amazon EMR 提供了以下功能：
+ **Just-in-time check** pointing — Flink 流式传输作业可以响应竞价型实例中断，对正在运行的作业执行 just-in-time (JIT) 检查点，并防止在这些竞价型实例上安排其他任务。默认和自适应计划程序支持 JIT 检查点。
+ **组合重启机制**：组合重启机制会在作业达到目标资源并行度或当前配置窗口结束后尽最大努力尝试重新启动作业。这样做还可以防止由于多次竞价型实例终止而导致作业连续重启。组合重启机制仅适用于自适应计划程序。

这些功能具有以下优势：
+ 您可以利用竞价型实例来运行任务管理器并降低集群开支。
+ 竞价型实例任务管理器的活动性提高使弹性得到提高，作业计划的效率提升。
+ 您的 Flink 作业将有更长的正常运行时间，因为竞价型实例终止后的重启次数将会减少。

## 正常停用的工作原理
<a name="jobruns-flink-decommission-howitworks"></a>

考虑以下示例：您预调配了运行 Apache Flink 的 EKS 上的 Amazon EMR，为作业管理器指定了按需节点，为任务管理器指定了竞价型实例节点。终止前两分钟，任务管理器收到中断通知。

在这种情况下，作业管理器将处理竞价型实例中断信号，阻止在竞价型实例上计划其他任务，并为流式传输作业启动 JIT 检查点。

然后，只有在当前重启间隔窗口内有足够的新资源可用来满足当前作业并行度之后，作业管理器才会重新启动任务图。重启窗口间隔是根据竞价型实例替换持续时间、新的任务管理器容器组（pod）的创建以及向作业管理器的注册来决定的。

## 先决条件
<a name="jobruns-flink-decommission-prereqs"></a>

要使用正常停用，请在运行 Apache Flink 的 Amazon EMR on EKS 集群上创建并运行流处理作业。启用在至少一个竞价型实例上计划的自适应计划程序和任务管理器，如以下示例所示。您应该为作业管理器使用按需节点，并且只要至少有一个竞价型实例，您就可以将按需节点用于任务管理器。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## 配置
<a name="jobruns-flink-decommission-config"></a>

本部分涵盖了您可以根据自己停用需求指定的大多数配置。


| Key | 说明 | 默认 值 | 可接受值 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  启用任务管理器的正常停用。  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  在自适应计划程序中启用组合重启机制。  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  为作业执行合并重启的合并重启窗口间隔。不含单位的整数将被解释为毫秒。  |  1m  |  示例：30、60s、3m、1h。 | 

# 在 Flink 应用程序中使用 Autoscaler
<a name="jobruns-flink-autoscaler"></a>

Operator Autoscaler 可以从 Flink 任务中收集指标，并自动调整任务顶点级别的并行度，从而帮助缓解反向压力。配置可能类似于如下示例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

此配置使用最新版本的 Amazon EMR 的默认值。如果使用其他版本，值可能会有所不同。

**注意**  
从 Amazon EMR 7.2.0 开始，无需在配置中包含前缀 `kubernetes.operator`。如果使用 7.1.0 或更低版本，则必须在每次配置前使用前缀。例如，必须指定 `kubernetes.operator.job.autoscaler.scaling.enabled`。

以下是 Autoscaler 的配置选项。
+ `job.autoscaler.scaling.enabled`：指定是否允许 Autoscaler 执行顶点缩放。默认值为 `true`。如果禁用此配置，Autoscaler 只会收集指标并评估每个顶点的建议并行度，但不会升级作业。
+ `job.autoscaler.stabilization.interval`：不会执行新扩展的稳定期。默认值为 5 分钟。
+ `job.autoscaler.metrics.window`：扩展指标聚合窗口大小。窗口越大，就越流畅、越稳定，但 Autoscaler 对负载突变做出反应的速度可能会变慢。默认值为 15 分钟。建议使用 3 到 60 分钟之间的值进行实验。
+ `job.autoscaler.target.utilization`：提供稳定任务性能和一定负载波动缓冲能力的目标顶点利用率。默认情况下，作业顶点 utilization/load 的`0.7`目标为 70%。
+ `job.autoscaler.target.utilization.boundary`：目标顶点利用率边界，用作额外的缓冲，避免在负载波动时立即扩展。默认值为 `0.3`，即在触发缩放操作之前，允许与目标利用率有 30% 的偏差。
+ `ob.autoscaler.restart.time`：重新启动应用程序的预计时间。默认值为 5 分钟。
+ `job.autoscaler.catch-up.duration`：赶上进度的预计时间，即在扩展操作完成后完全处理积压工作的时间。默认值为 5 分钟。通过缩短赶上进度的持续时间，Autoscaler 必须为扩展操作预留更多额外容量。
+ `pipeline.max-parallelism`：Autoscaler 可以使用的最大并行度。如果该限值高于 Flink 配置中设定的最大并行度或直接在每个 Operator 上设定的最大并行度，则 Autoscaler 会忽略此限值。默认值为 -1。请注意，Autoscaler 将并行度计算为最大并行度数的除数，因此建议选择具有大量除数的最大并行度设置，而非依赖 Flink 提供的默认设置。建议对此配置使用 60 的倍数，例如 120、180、240、360、720。

有关详细配置的参考页面，请参阅 [Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)。

# Autoscaler 参数自动调整
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

本节介绍了各个 Amazon EMR 版本的自动调整行为。还详细介绍了不同的自动扩缩配置。

**注意**  
Amazon EMR 7.2.0 及更高版本使用开源配置 `job.autoscaler.restart.time-tracking.enabled` 来实现**重新缩放时间估计**。重新缩放时间估计与 Amazon EMR 自动调整的功能相同，因此无需为重启时间手动分配经验值。  
如果运行的是 Amazon EMR 7.1.0 或更低版本，您仍然可以使用 Amazon EMR 自动调整。

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 及更高版本测量会应用自动扩缩决策所需的实际重启时间。在 7.1.0 及更低版本中，必须使用配置 `job.autoscaler.restart.time` 来手动配置估计的最长重启时间。通过使用配置 `job.autoscaler.restart.time-tracking.enabled`，您只需输入第一次缩放的重启时间。之后，Operator 会记录实际的重启时间，并将其用于后续缩放。

要启用此跟踪，请使用以下命令：

```
job.autoscaler.restart.time-tracking.enabled: true
```

以下是重新缩放时间估计的相关配置。


| 配置 | 必需 | 默认值 | 说明 | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 否 | False | 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置，以优化缩放决策。请注意，Autoscaler 只能自动调整 Autoscaler 参数 restart.time。 | 
| job.autoscaler.restart.time | 否 | 5m | Amazon EMR on EKS 使用的预期重启时间，直到 Operator 可根据之前的缩放确定实际重启时间。 | 
| job.autoscaler.restart.time-tracking.limit | 否 | 15m | 当 job.autoscaler.restart.time-tracking.enabled 设置为 true 时观察到的最长重启时间。 | 

下面是一个示例部署规范，可用来尝试重新缩放时间估计：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

要模拟背压，请使用以下部署规范。

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

将以下 Python 脚本上传到 S3 存储桶。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

要验证重缩时间估计是否有效，请确保已启用 Flink Operator 的 `DEBUG` 级别日志记录。下面的示例演示了如何更新 Helm 图表文件 `values.yaml`。然后重新安装更新后的 Helm 图表并再次运行 Flink 作业。

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

获取主容器组（pod）的名称。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

运行以下命令，获取指标评估中使用的实际重启时间。

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

您应该会看到类似下面的日志。请注意，只有第一次缩放才会使用 ` job.autoscaler.restart.time`。后续缩放使用观察到的重启时间。

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

开源内置 Flink Autoscaler 使用大量指标来做出最佳缩放决策。但在计算时使用的默认值适用于大多数工作负载，对于给定作业来说可能不是最佳值。Amazon EMR on EKS 版本的 Flink Operator 中添加的自动调整功能会查看针对特定捕获指标观察到的历史趋势，然后相应地尝试计算为给定作业定制的最佳值。


| 配置 | 必需 | 默认值 | 说明 | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 否 | False | 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置，以优化 Autoscaler 缩放决策。目前，Autoscaler 只能自动调整 Autoscaler 参数 restart.time。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 否 | 3 | 指示 Autoscaler 在 Amazon EMR on EKS 指标配置映射中保留的 Amazon EMR on EKS 历史指标数。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 否 | 3 | 指示 Autoscaler 在开始计算给定作业的平均重启时间之前执行的重启次数。 | 

要启用自动调整，必须完成以下操作：
+ 将 `kubernetes.operator.job.autoscaler.autotune.enable:` 设置为 `true`
+ 将 `metrics.job.status.enable:` 设置为 `TOTAL_TIME`
+ 按照[在 Flink 应用程序中使用 Autoscaler](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) 的设置启用自动扩缩功能。

下面是一个示例部署规范，可用来尝试自动调整。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

要模拟背压，请使用以下部署规范。

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

将以下 Python 脚本上传到 S3 存储桶。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

要验证 Autosutiner 是否正在运行，请使用以下命令。请注意，必须对 Flink Operator 使用您自己的主容器组（pod）信息。

首先，获取主容器组（pod）的名称。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

获取主容器组（pod）的名称后，您可以运行以下命令。

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

您应该会看到类似下面的日志。

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Amazon EMR on EKS 上 Flink 作业的维护和故障排除
<a name="jobruns-flink-troubleshooting"></a>

以下部分概述了如何维护长时间运行的 Flink 作业，并就如何排除 Flink 作业中的一些常见问题提供了指导。

# 维护 Flink 应用程序
<a name="jobruns-flink-maintain"></a>

**Topics**
+ [升级模式](#jobruns-flink-upgrademode)

Flink 应用程序通常设计为长时间运行，例如运行数周、数月甚至数年。与所有长期运行的服务一样，Flink 流式处理应用程序需要维护。这包括错误修复、改进与迁移到更高版本的 Flink 集群。

当 `FlinkDeployment` 和 `FlinkSessionJob` 资源的规范发生变化时，您需要升级正在运行的应用程序。为此，操作员停止正在运行的作业（除非已暂停），并使用最新的规范重新部署该作业，而对于有状态的应用程序，使用上次运行的状态。

用户可以通过 `JobSpec` 的 `upgradeMode` 设置来控制如何在有状态的应用程序停止和恢复时管理状态。

## 升级模式
<a name="jobruns-flink-upgrademode"></a>

可选介绍

**无状态**  
无状态应用程序将从空白状态升级。

**上一状态**  
在任一应用程序状态（即使是失败的作业）下快速升级都不需要正常运行的作业，因为它总是使用最后一次成功的检查点。如果 HA 元数据丢失，可能需要进行手动恢复。要限制在选取最新检查点时作业可能回退的时间，您可以配置 `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`。如果检查点早于配置的值，则对于运行正常的作业，系统将获取其保存点。在会话模式下，不支持此功能。

**保存点**  
使用 savepoint 进行升级，提供最大的安全性和充当积分的可能性。 backup/fork 升级过程中，系统将创建保存点。请注意，Flink 作业必须正在运行，才能创建保存点。如果作业处于不健康状态，则将使用最后一个检查点（除非 kubernetes.operator.job.upgrade）。 last-state-fallback.enabled 设置为 false）。如果最后一个检查点不可用，则作业升级将失败。

# 问题排查
<a name="jobruns-flink-troubleshoot"></a>

本部分介绍如何对 Amazon EMR on EKS 的问题进行故障排除。有关如何排查一般性 Amazon EMR 问题的信息，请参阅《Amazon EMR 管理指南》中的 [集群问题排查](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html)**。
+ [对使用 PersistentVolumeClaims (PVC) 的作业进行故障排除](permissions-for-pvc.md)
+ [对 Amazon EMR on EKS 垂直自动扩展进行问题排查](troubleshooting-vas.md)
+ [对 Amazon EMR on EKS Spark 运算符进行故障排除](troubleshooting-sparkop.md)

## Amazon EMR on EKS 上的 Apache Flink 问题排查
<a name="jobruns-flink-troubleshooting-apache-flink"></a>

### 安装 Helm 图表时未找到资源映射
<a name="w2aac21c21b7b7b3"></a>

安装 Helm 图表时可能会遇到以下错误消息。

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

要解决此错误，请安装 cert-manager 以允许添加 Webhook 组件。您必须在您使用的每个 Amazon EKS 集群中分别安装 cert-manager。

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### AWS 服务 访问被拒绝错误
<a name="jobruns-flink-troubleshooting-access-denied"></a>

如果遇到 *access denied* 错误，请确认 Helm 图表文件 `values.yaml` 中 `operatorExecutionRoleArn` 的 IAM 角色是否具有正确的权限。此外，请确保 `FlinkDeployment` 规范中 `executionRoleArn` 下的 IAM 角色具有正确的权限。

### `FlinkDeployment` 卡滞
<a name="jobruns-flink-troubleshooting-stuck"></a>

如果您的 `FlinkDeployment` 卡在“被囚禁”状态，请按照以下步骤强制删除部署：

1. 编辑部署运行。

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. 移除此终结器。

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. 删除部署。

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### 在选择加入模式下运行 Flink 应用程序时出现了 s3a AWSBad RequestException 问题 AWS 区域
<a name="jobruns-flink-troubleshooting-optin-region"></a>

如果您以可[选方式运行 Flink 应用程序 AWS 区域，](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)则可能会看到以下错误：

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

要修复这些错误，请在 `FlinkDeployment` 定义文件中使用以下配置。

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

我们还建议您使用 SDKv2 凭证提供商：

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

如果您想使用 SDKv1 凭证提供商，请确保您的 SDK 支持您的选择加入区域。有关更多信息，请参阅[aws-sdk-java GitHub 存储库](https://github.com/aws/aws-sdk-java)。

如果您在选择加入的区域运行 Flink SQL 语句时收到 `S3 AWSBadRequestException`，请务必在 Flink 配置规范中设置配置 `fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME`。

### 在 CN 区域运行 Flink 会话作业 AWSBadRequestException 时为 S3A
<a name="jobruns-flink-troubleshooting-optin-region"></a>

对于 Amazon EMR 发行版 6.15.0-7.2.0 版本，在中国区域运行 Flink 会话作业时可能会收到以下错误消息。包括中国（北京）和中国（宁夏）：

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

人们已经意识到这个问题。团队正在努力修补这些发行版的 Flink Operator。但在我们完成补丁之前，要修复这个错误，需要下载 Flink Operator Helm 图表，将其解压（提取压缩文件），然后在 Helm 图表中更改配置。

具体步骤如下：

1. 更改为（特别是将目录更改为）Helm 图表的本地文件夹，然后运行以下命令行，拉取 Helm 图表并将其解压。

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. 进入 Helm 图表文件夹，找到 `templates/flink-operator.yaml` 文件。

1. 在中找到`flink-operator-config` ConfigMap 并添加以下`fs.s3a.endpoint.region`配置`flink-conf.yaml`。例如：

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. 安装本地 Helm 图表 并运行作业。

# 支持将 Amazon EMR on EKS 与 Apache Flink 结合使用的发行版
<a name="jobruns-flink-security-release-versions"></a>

以下 Amazon EMR on EKS 发行版支持使用 Apache Flink。有关所有可用发行版的信息，请参阅 [Amazon EMR on EKS 版本](emr-eks-releases.md)。


| 发行版标签 | Java | Flink | Flink Operator | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 