

# Running Flink jobs with Amazon EMR on EKS
Running Flink jobs

Amazon EMR releases 6.13.0 and higher support Amazon EMR on EKS with Apache Flink, or the Flink Kubernetes operator, as a job submission model for Amazon EMR on EKS. With Amazon EMR on EKS with Apache Flink, you can deploy and manage Flink applications with the Amazon EMR release runtime on your own Amazon EKS clusters. Once you deploy the Flink Kubernetes operator in your Amazon EKS cluster, you can directly submit Flink applications with the operator. The operator manages the lifecycle of Flink applications.

**Topics**
+ [

# Setting up and using the Flink Kubernetes operator
](jobruns-flink-kubernetes-operator.md)
+ [

# Using Flink Native Kubernetes
](jobruns-flink-native-kubernetes.md)
+ [

# Customizing Docker images for Flink and FluentD
](jobruns-flink-docker-flink-fluentd.md)
+ [

# Monitoring Flink Kubernetes operator and Flink jobs
](jobruns-flink-monitoring.md)
+ [

# How Flink supports high availability and job resiliency
](jobruns-flink-resiliency.md)
+ [

# Using Autoscaler for Flink applications
](jobruns-flink-autoscaler.md)
+ [

# Maintenance and troubleshooting for Flink jobs on Amazon EMR on EKS
](jobruns-flink-troubleshooting.md)
+ [

# Supported releases for Amazon EMR on EKS with Apache Flink
](jobruns-flink-security-release-versions.md)

# Setting up and using the Flink Kubernetes operator
Flink Kubernetes operator

The following pages describe how to set up and use the Flink Kubernetes operator to run Flink jobs with Amazon EMR on EKS. The topics available include required prerequisites, how to set up your environment, and running a Flink application on Amazon EMR on EKS.

**Topics**
+ [

# Setting up the Flink Kubernetes operator for Amazon EMR on EKS
](jobruns-flink-kubernetes-operator-setup.md)
+ [

# Installing the Flink Kubernetes operator for Amazon EMR on EKS
](jobruns-flink-kubernetes-operator-getting-started.md)
+ [

# Run a Flink application
](jobruns-flink-kubernetes-operator-run-application.md)
+ [

# Security role permissions for running a Flink application
](jobruns-flink-kubernetes-security.md)
+ [

# Uninstalling the Flink Kubernetes operator for Amazon EMR on EKS
](jobruns-flink-kubernetes-operator-uninstall.md)

# Setting up the Flink Kubernetes operator for Amazon EMR on EKS
Setting up

Complete the following tasks to get set up before you install the Flink Kubernetes operator on Amazon EKS. If you've already signed up for Amazon Web Services (AWS) and have used Amazon EKS, you are almost ready to use Amazon EMR on EKS. Complete the following tasks to get set up for the Flink operator on Amazon EKS. If you've already completed any of the prerequisites, you can skip those and move on to the next one.
+ **[Install or update to the latest version of the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) ** – If you've already installed the AWS CLI, confirm that you have the latest version.
+ **[Set up kubectl and eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** – eksctl is a command line tool that you use to communicate with Amazon EKS.
+ **[Install Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** – The Helm package manager for Kubernetes helps you install and manage applications on your Kubernetes cluster. 
+ **[Get started with Amazon EKS – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** – Follow the steps to create a new Kubernetes cluster with nodes in Amazon EKS.
+ **[Choose an Amazon EMR release label](jobruns-flink-security-release-versions.md) (release 6.13.0 or higher)** – the Flink Kubernetes operator is supported with Amazon EMR releases 6.13.0 and higher.
+ **[Enable IAM Roles for Service Accounts (IRSA) on the Amazon EKS cluster](setting-up-enable-IAM.md)**.
+ **[Create a job execution role](creating-job-execution-role.md)**.
+ **[Update the trust policy of the job execution role ](setting-up-trust-policy.md)**.
+ Create an operator execution role. This step is optional. You can use the same role for Flink jobs and operator. If you want to have a different IAM role for your operator, you can create a separate role.
+ Update the trust policy of the operator execution role. You must explicitly add one trust policy entry for the roles you want to use for the Amazon EMR Flink Kubernetes operator service account. You can follow this example format:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# Installing the Flink Kubernetes operator for Amazon EMR on EKS
Installing the Flink Kubernetes operator

This topic helps you start to use the Flink Kubernetes operator on Amazon EKS by preparing a Flink deployment.

## Install the Kubernetes operator


Use the following steps to install the Kubernetes operator for Apache Flink.

1. If you haven't already, complete the steps in [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md).

1. Install the *cert-manager* (once per Amazon EKS cluster) to enable adding the webhook component.

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. Install the Helm chart.

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   Example output:

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. Wait for the deployment to be complete and verify the chart installation.

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. You should see the following message when deployment is complete.

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. Use the following command to see the deployed operator.

   ```
   helm list --namespace $NAMESPACE
   ```

   The following shows example output, where the app version `x.y.z-amzn-n` would correspond with the Flink operator version for your Amazon EMR on EKS release. For more information, see [Supported releases for Amazon EMR on EKS with Apache Flink](jobruns-flink-security-release-versions.md).

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### Upgrade the Kubernetes operator


To upgrade the version of the Flink operator, follow these steps:

1. Uninstall the old `flink-kubernetes-operator`: `helm uninstall flink-kubernetes-operator -n <NAMESPACE>`.

1. Delete CRD (since helm will not automatically delete the old CRD): `kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`.

1. Re-install `flink-kubernetes-operator` with the newer version.

# Run a Flink application


With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Application mode on Amazon EMR on EKS. With Amazon EMR 6.15.0 and higher, you can also run a Flink application in Session mode. This page describes both methods that you can use to run a Flink application with Amazon EMR on EKS.

**Topics**

**Note**  
You must have an Amazon S3 bucket created to store the high-availability metadata when you submit your Flink job. If you don’t want to use this feature, you can disable it. It's enabled by default.

**Prerequisite** – Before you can run a Flink application with the Flink Kubernetes operator, complete the steps in [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md) and [Install the Kubernetes operator](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator).

------
#### [ Application mode ]

With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Application mode on Amazon EMR on EKS.

1. Create a `FlinkDeployment` definition file `basic-example-app-cluster.yaml` like in the following example. If you activated and use one of the [opt-in AWS Regions](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), make sure you uncomment and configure the configuration `fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Submit the Flink deployment with the following command. This will also create a `FlinkDeployment` object named `basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Access the Flink UI.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Open `localhost:8081` to view your Flink jobs locally.

1. Clean up the job. Remember to clean up the S3 artifacts that were created for this job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch logs.

For more information on submitting applications to Flink through the Flink Kubernetes operator, see [ Flink Kubernetes operator examples ](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)in the `apache/flink-kubernetes-operator` folder on GitHub.

------
#### [ Session mode ]

With Amazon EMR 6.15.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Session mode on Amazon EMR on EKS.

1. Create a `FlinkDeployment` definition file named `basic-example-app-cluster.yaml` like in the following example. If you activated and use one of the [opt-in AWS Regions](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), make sure you uncomment and configure the configuration `fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Submit the Flink deployment with the following command. This will also create a `FlinkDeployment` object named `basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Use the following command to confirm that the session cluster `LIFECYCLE` is `STABLE`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   The output should be similar to the following example:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Create a `FlinkSessionJob` custom definition resource file `basic-session-job.yaml` with the following example content:

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Submit the Flink session job with the following command. This will create a `FlinkSessionJob` object `basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Use the following command to confirm that the session cluster `LIFECYCLE` is `STABLE`, and the `JOB STATUS` is `RUNNING`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   The output should be similar to the following example:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Access the Flink UI.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Open `localhost:8081` to view your Flink jobs locally.

1. Clean up the job. Remember to clean up the S3 artifacts that were created for this job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch logs.

------

# Security role permissions for running a Flink application


This topic describes security roles for deploying and running a Flink application. There are two roles required to manage a deployment and to create and manage jobs, the operator role and job role. This topic introduces them and lists their permissions.

## Role based access control
RBAC

To deploy the operator and run Flink jobs, we must create two Kubernetes roles: one operator and one job role. Amazon EMR creates the two roles by default when you install the operator.

## Operator role


We use the operator role to manage `flinkdeployments` to create and manage the JobManager for each Flink job and other resources, like services.

The operator role's default name is `emr-containers-sa-flink-operator` and requires the following permissions.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## Job role


The JobManager uses the job role to create and manage TaskManagers and ConfigMaps for each job.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# Uninstalling the Flink Kubernetes operator for Amazon EMR on EKS
Uninstalling the operator

Follow these steps to uninstall the Flink Kubernetes operator.

1. Delete the operator.

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. Delete Kubernetes resources that Helm doesn’t uninstall.

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (Optional) Delete the cert-manager.

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# Using Flink Native Kubernetes
Flink Native Kubernetes

Amazon EMR releases 6.13.0 and higher support Flink Native Kubernetes as a command-line tool that you can use to submit and execute Flink applications to an Amazon EMR on EKS cluster.

**Topics**
+ [

# Setting up Flink Native Kubernetes for Amazon EMR on EKS
](jobruns-flink-native-kubernetes-setup.md)
+ [

# Getting started with Flink native Kubernetes for Amazon EMR on EKS
](jobruns-flink-native-kubernetes-getting-started.md)
+ [

# Flink JobManager service account security requirements for Native Kubernetes
](jobruns-flink-native-kubernetes-security-requirements.md)

# Setting up Flink Native Kubernetes for Amazon EMR on EKS
Setting up

Complete the following tasks to get set up before you can run an application with the Flink CLI on Amazon EMR on EKS. If you've already signed up for Amazon Web Services (AWS) and have used Amazon EKS, you are almost ready to use Amazon EMR on EKS. If you've already completed any of the prerequisites, you can skip those and move on to the next one.
+ **[Install or update to the latest version of the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) ** – If you've already installed the AWS CLI, confirm that you have the latest version.
+ **[Get started with Amazon EKS – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** – Follow the steps to create a new Kubernetes cluster with nodes in Amazon EKS.
+ **[Select an Amazon EMR base image URI](docker-custom-images-tag.md) (release 6.13.0 or higher)** – the Flink Kubernetes command is supported with Amazon EMR releases 6.13.0 and higher.
+ Confirm that the JobManager service account has appropriate permissions to create and watch TaskManager pods. For more information, see [Flink JobManager service account security requirements for Native Kubernetes](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html).
+ Set up your local [AWS credentials profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).
+ [Create or updating a kubeconfig file for an Amazon EKS cluster ](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html)on which you want to run the Flink applications.

# Getting started with Flink native Kubernetes for Amazon EMR on EKS
Getting started

These steps show you how to configure, set up a service account for, and run a Flink application. Flink Native Kubernetes is used to deploy Flink on a running Kubernetes cluster.

## Configure and run a Flink application


Amazon EMR 6.13.0 and higher supports Flink Native Kubernetes for running Flink applications on an Amazon EKS cluster. To run a Flink application, follow these steps:

1. Before you can run a Flink application with the Flink Native Kubernetes command, complete the steps in [Setting up Flink Native Kubernetes for Amazon EMR on EKS](jobruns-flink-native-kubernetes-setup.md).

1. [Download and install Flink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation).

1. Set the values for the following environment variables.

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-AWS Region-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. Create a service account to manage Kubernetes resources.

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. Run the `run-application` CLI command.

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. Examine the created Kubernetes resources.

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. Port forward to 8081.

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. Locally access the Flink UI.  
![\[Access the Flink UI.\]](http://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. Delete the Flink application.

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

For more information about submitting applications to Flink, see [ Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) in the Apache Flink documentation.

# Flink JobManager service account security requirements for Native Kubernetes
Security requirements

The Flink JobManager pod uses a Kubernetes service account to access the Kubernetes API server to create and watch TaskManager pods. The JobManager service account must have appropriate permissions to create/delete TaskManager pods and allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager in your cluster.

The following rules apply to this service account.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# Customizing Docker images for Flink and FluentD


Take the following steps to customize Docker images for Amazon EMR on EKS with Apache Flink or FluentD images. These include technical guidance for getting a base image, customizing it, publishing it, and submitting a workload.

**Topics**
+ [

## Prerequisites
](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [

## Step 1: Retrieve a base image from Amazon Elastic Container Registry
](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [

## Step 2: Customize a base image
](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [

## Step 3: Publish your custom image
](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [

## Step 4: Submit a Flink workload in Amazon EMR using a custom image
](#jobruns-flink-docker-flink-fluentd-submit-workload)

## Prerequisites


Before you customize your Docker image, make sure that you have completed the following prerequisites:
+ Completed the [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html) steps.
+ Installed Docker in your environment. For more information, see [Get Docker](https://docs.docker.com/get-docker/).

## Step 1: Retrieve a base image from Amazon Elastic Container Registry
Retrieve a base image from Amazon Elastic Container Registry

The base image contains the Amazon EMR runtime and connectors that you need to access other AWS services. If you're using Amazon EMR on EKS with Flink version 6.14.0 or higher, you can get the base images from the Amazon ECR Public Gallery. Browse the gallery to find the image link and pull the image to your local workspace. For example, for the Amazon EMR 6.14.0 release, the following `docker pull` command returns the latest standard base image. Replace `emr-6.14.0:latest` with the release version you want.

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

The following are links to the Flink gallery image and Fluentd gallery image:
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fluentd/emr-6.14.0(](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## Step 2: Customize a base image
Customize a base image

The following steps describe how to customize the base image you pulled from Amazon ECR.

1. Create a new `Dockerfile` on your local Workspace.

1. Edit the `Dockerfile` and add the following content. This `Dockerfile` uses the container image you pulled from `public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest`.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   Use the following configuration if you're using `Fluentd`.

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. Add commands in the `Dockerfile` to customize the base image. The following command demonstrates how to install Python libraries.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. In the same directory of where you created `DockerFile`, run the following command to build the Docker image. The field you supply following the `-t` flag is your custom name for the image.

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Step 3: Publish your custom image
Publish your custom image

You can now publish the new Docker image to your Amazon ECR registry.

1. Run the following command to create an Amazon ECR repository to store your Docker image. Provide a name for your repository, such as `emr_custom_repo.` For more information, see [ Create a repository](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository) in the Amazon Elastic Container Registry User Guide.

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. Run the following command to authenticate to your default registry. For more information, see [Authenticate to your default registry](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry) in the Amazon Elastic Container Registry User Guide.

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. Push the image. For more information, see [ Push an image to Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image) in the Amazon Elastic Container Registry User Guide.

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Step 4: Submit a Flink workload in Amazon EMR using a custom image
Submit a Flink workload

Make the following changes to your `FlinkDeployment` spec to use a custom image. To do so, enter your own image in the `spec.image` line of your deployment spec.

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

To use a custom image for your Fluentd job, enter your own image in the `monitoringConfiguration.image` line of your deployment spec.

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# Monitoring Flink Kubernetes operator and Flink jobs
Monitoring

This section describes several ways that you can monitor your Flink jobs with Amazon EMR on EKS. These include integrating Flink with the Amazon Managed Service for Prometheus, using the *Flink Web Dashboard*, which provides job status and metrics, or using a monitoring configuration to send log data to Amazon S3 and Amazon CloudWatch.

**Topics**
+ [

# Use Amazon Managed Service for Prometheus to monitor Flink jobs
](jobruns-flink-monitoring-prometheus.md)
+ [

# Use the Flink UI to monitor Flink jobs
](jobruns-flink-monitoring-ui.md)
+ [

# Use monitoring configuration to monitor Flink Kubernetes operator and Flink jobs
](jobruns-flink-monitoring-configuration.md)

# Use Amazon Managed Service for Prometheus to monitor Flink jobs
Using Amazon Managed Service for Prometheus

You can integrate Apache Flink with Amazon Managed Service for Prometheus (management portal). Amazon Managed Service for Prometheus supports ingesting metrics from Amazon Managed Service for Prometheus servers in clusters running on Amazon EKS. Amazon Managed Service for Prometheus works together with a Prometheus server already running on your Amazon EKS cluster. Running Amazon Managed Service for Prometheus integration with Amazon EMR Flink operator will automatically deploy and configure a Prometheus server to integrate with Amazon Managed Service for Prometheus.

1. [ Create an Amazon Managed Service for Prometheus Workspace](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html). This workspace serves as an ingestion endpoint. You will need the remote write URL later.

1. Set up IAM roles for service accounts.

   For this method of onboarding, use IAM roles for the service accounts in the Amazon EKS cluster where the Prometheus server is running. These roles are also called *service roles*.

   If you don't already have the roles, [ set up service roles for the ingestion of metrics from Amazon EKS clusters.](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html)

   Before you continue, create an IAM role called `amp-iamproxy-ingest-role`.

1. Install the Amazon EMR Flink Operator with Amazon Managed Service for Prometheus.

Now that you have an Amazon Managed Service for Prometheus workspace, a dedicated IAM role for Amazon Managed Service for Prometheus, and the necessary permissions, you can install the Amazon EMR Flink operator.

Create an `enable-amp.yaml` file. This file lets you use a custom configuration to override Amazon Managed Service for Prometheus settings. Make sure to use your own roles.

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

Use the [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) command to pass overrides to the `flink-kubernetes-operator` chart.

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

This command automatically installs a Prometheus reporter in the operator on port 9999. Any future `FlinkDeployment` also exposes a `metrics` port on 9249.
+ Flink operator metrics appear in Prometheus under the label `flink_k8soperator_`.
+ Flink Task Manager metrics appear in Prometheus under the label `flink_taskmanager_`.
+ Flink Job Manager metrics appear in Prometheus under the label `flink_jobmanager_`.

# Use the Flink UI to monitor Flink jobs
Using the Flink UI

To monitor the health and performance of a running Flink application, use the *Flink Web Dashboard*. This dashboard provides information about the status of the job, the number of TaskManagers, and the metrics and logs for the job. It also lets you view and modify the configuration of the Flink job, and to interact with the Flink cluster to submit or cancel jobs.

To access the Flink Web Dashboard for a running Flink application on Kubernetes:

1. Use the `kubectl port-forward` command to forward a local port to the port on which the Flink Web Dashboard is running in the Flink application's TaskManager pods. By default, this port is 8081. Replace *deployment-name* with the name of the Flink application deployment from above.

   ```
   kubectl get deployments -n namespace
   ```

   Example output:

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. If you want to use a different port locally, use the *local-port*:8081 parameter.

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. In a web browser, navigate to `http://localhost:8081` (or `http://localhost:local-port` if you used a custom local port) to access the Flink Web Dashboard. This dashboard shows information about the running Flink application, such as the status of the job, the number of TaskManagers, and the metrics and logs for the job.  
![\[Sample Flink Dashboard UI\]](http://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# Use monitoring configuration to monitor Flink Kubernetes operator and Flink jobs
Using monitoring configuration

Monitoring configuration lets you easily set up log archiving of your Flink application and operator logs to S3 and/or CloudWatch (you can choose either one or both). Doing so adds a FluentD sidecar to your JobManager and TaskManager pods and subsequently forwards these components' logs to your configured sinks.

**Note**  
You must set up IAM Roles for the service account for your Flink operator and your Flink job (Service Accounts) to be able to use this feature, as it requires interacting with other AWS services. You must set this up using IRSA in [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md).

## Flink application logs


You can define this configuration in the following way.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

The following are configuration options.
+ `s3MonitoringConfiguration` – configuration key to set up forwarding to S3
  + `logUri` (required) – the S3 bucket path of where you want to store your logs.
  + The path on S3 once the logs are uploaded will look like the following.
    + No log rotation enabled:

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + Log rotation is enabled. You can use both a rotated file and a current file (one without the date stamp).

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      The following format is an incrementing number.

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + The following IAM permissions are required to use this forwarder.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration` – configuration key to set up forwarding to CloudWatch.
  + `logGroupName` (required) – nameof the CloudWatch log group that you want to send logs to (automatically creates the group if it doesn't exist).
  + `logStreamNamePrefix` (optional) – name of the log stream that you want to send logs into. Default value is an empty string. The format is as follows:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + The following IAM permissions are required to use this forwarder.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources` (optional) – the configuration key to set resource limits on the launched Fluentbit sidecar container.
  + `memoryLimit` (optional) – the default value is 512Mi. Adjust according to your needs.
  + `cpuLimit` (optional) – this option doesn't have a default. Adjust according to your needs.
+ `containerLogRotationConfiguration` (optional) – controls the container log rotation behavior. It is enabled by default.
  + `rotationSize` (required) – specifies the file size for the log rotation. The range of possible values is from 2KB to 2GB. The numeric unit portion of the rotationSize parameter is passed as an integer. Since decimal values aren't supported, you can specify a rotation size of 1.5GB, for example, with the value 1500MB. The default is 2GB.
  + `maxFilesToKeep` (required) – specifies the maximum number of files to retain in container after rotation has taken place. The minimum value is 1, and the maximum value is 50. The default is 10.

## Flink operator logs
Flink operator logs

We can also enable log archiving for the operator by using the following options in the `values.yaml` file in your helm chart installation. You can enable S3, CloudWatch, or both.

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

The following are the available configuration options under `monitoringConfiguration`.
+ `s3MonitoringConfiguration` – set this option to archive to S3.
+ `logUri` (required) – The S3 bucket path where you want to store your logs.
+ The following are formats of what the S3 bucket paths might look like once the logs are uploaded.
  + No log rotation enabled.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + Log rotation is enabled. You can use both a rotated file and a current file (one without the date stamp).

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    The following format index is an incrementing number.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration` – the configuration key to set up forwarding to CloudWatch.
  + `logGroupName` (required) – name of the CloudWatch log group that you want to send logs to. The group automatically gets created if it doesn't exist.
  + `logStreamNamePrefix` (optional) – name of the log stream that you want to send logs into. The default value is an empty string. The format in CloudWatch is as follows:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources` (optional) – the configuration key to set resource limits on the launched Fluentbit sidecar container.
  + `memoryLimit` (optional) – the memory limit. Adjust according to your needs. The default is 512Mi.
  + `cpuLimit` – the CPU limit. Adjust according to your needs. No default value.
+ `containerLogRotationConfiguration` (optional): – controls the container log rotation behavior. It is enabled by default.
  + `rotationSize` (required) – specifies file size for the log rotation. The range of possible values is from 2KB to 2GB. The numeric unit portion of the rotationSize parameter is passed as an integer. Since decimal values aren't supported, you can specify a rotation size of 1.5GB, for example, with the value 1500MB. The default is 2GB.
  + `maxFilesToKeep` (required) – specifies the maximum number of files to retain in container after rotation has taken place. The minimum value is 1, and the maximum value is 50. The default is 10.

# How Flink supports high availability and job resiliency


The following sections outline how Flink makes jobs more reliable and highly available. It does this through built-in capabilities like Flink high availability and various recovery capabilities if failures occur.

**Topics**
+ [

# Using high availability (HA) for Flink Operators and Flink Applications
](jobruns-flink-using-ha.md)
+ [

# Optimizing Flink job restart times for task recovery and scaling operations with Amazon EMR on EKS
](jobruns-flink-restart.md)
+ [

# Graceful decommission of Spot Instances with Flink on Amazon EMR on EKS
](jobruns-flink-decommission.md)

# Using high availability (HA) for Flink Operators and Flink Applications
Using high availability

This topic shows how to configure high availability and describes how it works for a few different use cases. These include when you're using the Job manager and when you're using Flink native kubernetes.

## Flink operator high-availability
Flink operator

We enable *high availability* for the Flink Operator so that we can fail-over to a standby Flink Operator to minimize downtime in the operator control loop if failures occur. High availability is enabled by default and the default number of starting operator replicas is 2. You can configure the replicas field in your `values.yaml` file for the helm chart.

The following fields are customizable:
+ `replicas` (optional, default is 2): Setting this number to greater than 1 creates other standby Operators and allows for faster recovery of your job.
+ `highAvailabilityEnabled` (optional, default is true): Controls whether you want to enable HA. Specifying this parameter as true enables multi AZ deployment support, as well as sets the correct `flink-conf.yaml` parameters.

You can disable HA for your operator by setting the following configuration in your `values.yaml` file.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Multi AZ deployment**

We create the operator pods in multiple Availability Zones. This is a soft constraint, and your operator pods will be scheduled in the same AZ if you don't have enough resources in a different AZ.

**Determining the leader replica**

 If HA is enabled, the replicas use a lease to determine which of the JMs is the leader and uses a K8s Lease for leader election. You can describe the Lease and look at the .Spec.Holder Identity field to determine the current leader

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 Interaction**

**Configuring access credentials**

Please make sure that you have configured IRSA with appropriate IAM permissions to access the S3 bucket.

**Fetching job jars from S3 Application mode**

The Flink operator also supports fetching applications jars from S3. You just provide the S3 location for the jarURI in your FlinkDeployment specification.

You can also use this feature to download other artifacts like PyFlink scripts. The resulting Python script is dropped under the path `/opt/flink/usrlib/`.

The following example demonstrates how to use this feature for a PyFlink job. Note the jarURI and args fields.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 Connectors**

Flink comes packaged with two S3 connectors (listed below). The following sections discuss when to use which connector.

**Checkpointing: Presto S3 connector**
+ Set S3 scheme to s3p://
+ The recommended connector to use to checkpoint to s3. For more information, see [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) in the Apache Flink documentation.

Example FlinkDeployment specification:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Reading and writing to S3: Hadoop S3 connector**
+ Set S3 scheme to `s3://` or ( `s3a://` )
+ The recommended connector for reading and writing files from S3 (only S3 connector that implements the [ Flinks Filesystem interface](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ By default, we set `fs.s3a.aws.credentials.provider` in the `flink-conf.yaml` file, which is `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. If you override the d efault `flink-conf` completely and you are interacting with S3, make sure to use this provider.

Example FlinkDeployment spec

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
Flink Job Manager

High Availability (HA) for Flink Deployments allow jobs to continue making progress even if a transient error is encountered and your JobManager crashes. The jobs will restart but from the last successful checkpoint with HA enabled. Without HA enabled, Kubernetes will restart your JobManager, but your job will start as a fresh job and will lose its progress. After configuring HA, we can tell Kubernetes to store the HA metadata in a persistent storage to reference in case of a transient failure in the JobManager and then resume our jobs from the last successful checkpoint.

HA is enabled by default for your Flink jobs (the replica count is set to 2, which will require you to provide an S3 storage location for HA metadata to persist).

**HA configs**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

The following are descriptions for the above HA configs in Job Manager (defined under .spec.jobManager):
+ `highAvailabilityEnabled` (optional, default is true): Set this to `false ` if you don't want HA enabled and don’t want to use the provided HA configurations. You can still manipulate the "replicas" field to manually configure HA.
+ `replicas` (optional, default is 2): Setting this number to greater than 1 creates other standby JobManagers and allows for faster recovery of your job. If you disable HA, you must set replica count to 1, or you will keep getting validation errors (only 1 replica is supported if HA is not enabled).
+ `storageDir` (required): Because we use replica count as 2 by default, we have to provide a persistent storageDir. Currently this field only accepts S3 paths as the storage location.

**Pod locality**

 If you enable HA, we also try to collocate pods in the same AZ, which leads to improved performance (reduced network latency by having pods in same AZs). This is a best-effort process, meaning if you don't have enough resources in the AZ where the majority of your Pods are scheduled, the remaining Pods will still be scheduled but might end up on a node outside of this AZ.

**Determining the leader replica**

If HA is enabled, the replicas use a lease to determine which of the JMs is the leader and uses a K8s Configmap as the datastore to store this metadata. If you want to determine the leader, you can look at the content of the Configmap and look at the key `org.apache.flink.k8s.leader.restserver` under data to find the K8s pod with the IP address. You can also use the following bash commands.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink job - native Kubernetes
Flink job - native Kubernetes

Amazon EMR 6.13.0 and higher supports Flink native Kubernetes for running Flink applications in high-availability mode on an Amazon EKS cluster. 

**Note**  
You must have an Amazon S3 bucket created to store the high-availability metadata when you submit your Flink job. If you don’t want to use this feature, you can disable it. It's enabled by default.

To turn on the Flink high-availability feature, provide the following Flink parameters when you [run the `run-application` CLI command](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). The parameters are defined below the example.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – The Amazon S3 bucket where you want to store the high-availability metadata for your job.

  **`Dkubernetes.jobmanager.replicas`** – The number of Job Manager pods to create as an integer greater than `1`.

  **`Dkubernetes.cluster-id`** – A unique ID that identifies the Flink cluster.

# Optimizing Flink job restart times for task recovery and scaling operations with Amazon EMR on EKS
Optimizing restart times

When a task fails or when a scaling operation occurs, Flink attempts to re-execute the task from the last completed checkpoint. The restart process could take a minute or longer to execute, depending on the size of the checkpoint state and the number of parallel tasks. During the restart period, backlog tasks can accumulate for the job. There are some ways though, that Flink optimizes the speed of recovery and restart of execution graphs to improve job stability.

This page describes some of the ways that Amazon EMR Flink can improve the job restart time during task recovery or scaling operations on spot instances. Spot instances are unused compute capacity that's available at a discount. It has unique behaviors, including occasional interruptions, so it's important to understand how Amazon EMR on EKS handles these, including how Amazon EMR on EKS carries out decommissioning and job restarts.

**Topics**
+ [

## Task-local recovery
](#flink-restart-task-local)
+ [

## Task-local recovery by Amazon EBS volume mount
](#flink-restart-task-local-ebs)
+ [

## Generic log-based incremental checkpoint
](#flink-restart-log-check)
+ [

## Fine-grained recovery
](#flink-restart-fine-grained)
+ [

## Combined restart mechanism in adaptive scheduler
](#flink-restart-combined)

## Task-local recovery
Task-local recovery

**Note**  
Task-local recovery is supported with Flink on Amazon EMR on EKS 6.14.0 and higher.

With Flink checkpoints, each task produces a snapshot of its state that Flink writes to distributed storage like Amazon S3. In cases of recovery, the tasks restore their state from the distributed storage. Distributed storage provides fault tolerance and can redistribute the state during rescaling because it's accessible to all nodes.

However, a remote distributed store also has a disadvantage: all tasks must read their state from a remote location over the network. This can result in long recovery times for large states during task recovery or scaling operations.

This problem of long recovery time is solved by *task-local recovery*. Tasks write their state on checkpoint into a secondary storage that is local to the task, such as on a local disk. They also store their state in the primary storage, or Amazon S3 in our case. During recovery, the scheduler schedules the tasks on the same Task Manager where the tasks ran earlier so that they can recover from the local state store instead of reading from the remote state store. For more information, see [Task-Local Recovery](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) in the *Apache Flink Documentation*.

Our benchmark tests with sample jobs have shown that the recovery time has been reduced from minutes to a few seconds with task-local recovery enabled.

To enable task-local recovery, set the following configurations in your `flink-conf.yaml` file. Specify the checkpointing interval value in milliseconds.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Task-local recovery by Amazon EBS volume mount
Task-local recovery by EBS

**Note**  
Task-local recovery by Amazon EBS is supported with Flink on Amazon EMR on EKS 6.15.0 and higher.

With Flink on Amazon EMR on EKS, you can automatically provision Amazon EBS volumes to the TaskManager pods for task local recovery. The default overlay mount comes with 10 GB volume, which is sufficient for jobs with a lower state. Jobs with large states can enable the *automatic EBS volume mount* option. The TaskManager pods are automatically created and mounted during pod creation and removed during pod deletion.

Use the following steps to enable automatic EBS volume mount for Flink in Amazon EMR on EKS:

1. Export the values for the following variables that you'll use in upcoming steps.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Create or update a `kubeconfig` YAML file for your cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Create an IAM service account for the Amazon EBS Container Storage Interface (CSI) driver on your Amazon EKS cluster. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Create the Amazon EBS CSI driver with the following command:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Create the Amazon EBS storage class with the following command:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   And then apply the class:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm install the Amazon EMR Flink Kubernetes operator with options to create a service account. This creates the `emr-containers-sa-flink` to use in the Flink deployment.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. To submit the Flink job and enable the automatic provision of EBS volumes for task-local recovery, set the following configurations in your `flink-conf.yaml` file. Adjust the size limit for the state size of the job. Set `serviceAccount` to `emr-containers-sa-flink`. Specify the checkpointing interval value in milliseconds. And omit the `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

When you're ready to delete the Amazon EBS CSI driver plugin, use the following commands:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Generic log-based incremental checkpoint
Incremental checkpoints

**Note**  
Generic log-based incremental checkpointing is supported with Flink on Amazon EMR on EKS 6.14.0 and higher.

Generic log-based incremental checkpointing was added in Flink 1.16 to improve the speed of checkpoints. A faster checkpoint interval often results in a reduction of recovery work because fewer events need to be reprocessed after recovery. For more information, see [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) on the *Apache Flink Blog*.

With sample jobs, our benchmark tests have shown that the checkpoint time reduced from minutes to a few seconds with the generic log-based incremental checkpoint.

To enable generic log-based incremental checkpoints, set the following configurations in your `flink-conf.yaml` file. Specify the checkpointing interval value in milliseconds.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Fine-grained recovery
Fine-grained recovery

**Note**  
Fine-grained recovery support for the default scheduler is supported with Flink on Amazon EMR on EKS 6.14.0 and higher. Fine-grained recovery support in the adaptive scheduler is available with Flink on Amazon EMR on EKS 6.15.0 and higher.

When a task fails during execution, Flink resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks. Fine-grained recovery restarts only the pipeline-connected component of the failed task. In the following example, the job graph has 5 vertices (`A` to `E`). All connections between the vertices are pipelined with pointwise distribution, and the `parallelism.default` for the job is set to `2`. 

```
A → B → C → D → E
```

For this example, there are a total of 10 tasks running. The first pipeline (`a1` to `e1`) runs on a TaskManager (`TM1`), and the second pipeline (`a2` to `e2`) runs on another TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

There are two pipelined connected components: `a1 → e1`, and `a2 → e2`. If either `TM1` or `TM2` fails, the failure impacts only the 5 tasks in the pipeline where the TaskManager was running. The restart strategy only starts the affected pipelined component. 

Fine-grained recovery works only with perfectly parallel Flink jobs. It's not supported with `keyBy()` or `redistribute()` operations. For more information, see [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) in the *Flink Improvement Proposal* Jira project.

To enable fine-grained recovery, set the following configurations in your `flink-conf.yaml` file.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Combined restart mechanism in adaptive scheduler
Combined restart

**Note**  
The combined restart mechanism in adaptive scheduler is supported with Flink on Amazon EMR on EKS 6.15.0 and higher.

Adaptive scheduler can adjust the parallelism of the job based on available slots. It automatically reduces the parallelism if not enough slots are available to fit the configured job parallelism. If new slots become available, the job is scaled up again to the configured job parallelism. An adaptive scheduler avoids downtime on the job when there are not enough resources available. This is the supported scheduler for Flink Autoscaler. We recommend adaptive scheduler with Amazon EMR Flink for these reasons. However, adaptive schedulers might do multiple restarts within a short period of time, one restart for every new resource added. This could lead to a performance drop in the job.

With Amazon EMR 6.15.0 and higher, Flink has a combined restart mechanism in adaptive scheduler that opens a restart window when the first resource is added, and then waits until the configured window interval of the default 1 minute. It performs a single restart when there are sufficient resources available to run the job with configured parallelism or when the interval times out.

With sample jobs, our benchmark tests have shown that this feature processes 10% of records more than the default behavior when you use adaptive scheduler and Flink autoscaler.

To enable the combined restart mechanism, set the following configurations in your `flink-conf.yaml` file.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Graceful decommission of Spot Instances with Flink on Amazon EMR on EKS
Graceful decommission

Flink with Amazon EMR on EKS can improve the job restart time during task recovery or scaling operations.

## Overview


Amazon EMR on EKS releases 6.15.0 and higher support graceful decommission of Task Managers on Spot Instances in Amazon EMR on EKS with Apache Flink. As part of this feature, Amazon EMR on EKS with Flink provides the following capabilities:
+ **Just-in-time checkpointing** – Flink streaming jobs can respond to Spot Instance interruption, perform just-in-time (JIT) checkpoint of the running jobs, and prevent scheduling of additional tasks on these Spot Instances. JIT checkpoint is supported with default and adaptive scheduler.
+ **Combined restart mechanism** – A combined restart mechanism makes a best-effort attempt to restart the job after it reaches target resource parallelism or the end of the current configured window. This also prevents consecutive job restarts that might be caused by multiple Spot Instance terminations. Combined restart mechanism is available with adaptive scheduler only.

These capabilities provide the following benefits:
+ You can leverage Spot Instances to run Task Managers and reduce cluster expenditure.
+ Improved liveness for Spot Instance Task Manager results in higher resilience and more efficient job scheduling.
+ Your Flink jobs will have more uptime because there will be less restarts from Spot Instance termination.

## How graceful decommissioning works


Consider the following example: you provision an Amazon EMR on EKS cluster running Apache Flink, and you specify On-Demand nodes for Job Manager, and Spot Instance nodes for Task Manager. Two minutes before termination, Task Manager receives an interruption notice.

In this scenario, the Job Manager would handle the Spot Instance interruption signal, block scheduling of additional tasks on the Spot Instance, and initiate JIT checkpointing for the streaming job.

Then, the Job Manager would restart the job graph only after there is sufficient availability of new resources to satisfy current job parallelism in the current restart interval window. The restart window interval is decided on the basis of Spot Instance replacement duration, creation of new Task Manager pods, and registration with Job Manager.

## Prerequisites


To use graceful decommisioning, create and run a streaming job on an Amazon EMR on EKS cluster running Apache Flink. Enable Adaptive Scheduler and Task Managers scheduled on at least one Spot Instance, as shown in the following example. You should use On-Demand nodes for Job Manager, and you can use On-Demand nodes for Task Managers as long as there's at least one Spot Instance, too.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuration


This section covers most of the configurations that you can specify for your decommissioning needs. 


| Key | Description | Default value | Acceptable values | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Enable graceful decommission of Task Manager.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Enable combined restart mechanism in Adaptive Scheduler.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  The combined restart window interval to perfom merged restarts for the job. An integer without a unit is interpreted as milliseconds.  |  1m  |  Examples: 30, 60s, 3m, 1h  | 

# Using Autoscaler for Flink applications
Using Autoscaler

The operator autoscaler can help ease backpressure by collecting metrics from Flink jobs and automatically adjusting parallelism on a job vertex level. The following is an example of what your configuration might look like:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

This configuration uses default values for the latest release of Amazon EMR. If you use other versions, you might have different values.

**Note**  
As of Amazon EMR 7.2.0, you don't need to include the prefix `kubernetes.operator` in your configuration. If you use 7.1.0 or lower, you must use the prefix before each configuration. For example, you must specify `kubernetes.operator.job.autoscaler.scaling.enabled`.

The following are configuration options for the autoscaler.
+ `job.autoscaler.scaling.enabled` – specifies whether to enable vertex scaling execution by the autoscaler. The default is `true`. If you disable this configuration, the autoscaler only collects metrics and evaluates the suggested parallelism for each vertex but doesn't upgrade the jobs.
+ `job.autoscaler.stabilization.interval` – the stabilization period in which no new scaling will be executed. Default is 5 minutes.
+ `job.autoscaler.metrics.window` – the scaling metrics aggregation window size. The larger the window, the more smooth and stability, but the autoscaler might be slower to react to sudden load changes. Default is 15 minutes. We recommend you experiment by using a value between 3 to 60 minutes.
+ `job.autoscaler.target.utilization` – the target vertex utilization to provide stable job performance and some buffer for load fluctuations. The default is `0.7` targeting 70% utilization/load for the job vertexes.
+ `job.autoscaler.target.utilization.boundary` – the target vertex utilization boundary that serves as extra buffer to avoid immediate scaling on load fluctuations. Default is `0.3`, which means 30% deviation from the target utilization is allowed before triggering a scaling action.
+ `ob.autoscaler.restart.time` – the expected time to restart the application. Default is 5 minutes.
+ `job.autoscaler.catch-up.duration` – the expected time to catch up, meaning fully processing any backlog after a scaling operation completes. Default is 5 minutes. By lowering the catch-up duration, the autoscaler haves to reserve more extra capacity for the scaling actions.
+ `pipeline.max-parallelism` – the maximum parallelism the autoscaler can use. The autoscaler ignores this limit if it is higher than the max parallelism configured in the Flink config or directly on each operator. Default is -1. Note that the autoscaler computes the parallelism as a divisor of the max parallelism number therefore it is recommended to choose max parallelism settings that have a lot of divisors instead of relying on the Flink provided defaults. We recommend using multiples of 60 for this configuration, such as 120, 180, 240, 360, 720 etc.

For a more detailed configuration reference page, see [ Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Autoscaler parameter autotuning


This section describes auto-tuning behavior for various Amazon EMR versions. It also goes into detail regarding different auto-scaling configurations.

**Note**  
Amazon EMR 7.2.0 and higher uses the open source configuration `job.autoscaler.restart.time-tracking.enabled` to enable **rescale time estimation**. Rescale time estimation has the same functionality as Amazon EMR autotuning, so you don't have to manually assign empirical values to the restart time.  
You can still use Amazon EMR autotuning if you're using Amazon EMR 7.1.0 or lower.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 and higher measures the actual required restart time to apply autoscaling decisions. In releases 7.1.0 and lower, you had to use the configuration `job.autoscaler.restart.time` to manually configure estimated maximum restart time. By using the configuration `job.autoscaler.restart.time-tracking.enabled`, you only need to enter a restart time for the first scaling. Afterwards, the operator records the actual restart time and will use it for subsequent scalings.

To enable this tracking, use the following command:

```
job.autoscaler.restart.time-tracking.enabled: true
```

The following are the related configurations for rescale time estimation.


| Configuration | Required | Default | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | No | False | Indicates whether the Flink Autoscaler should automatically tune configurations over time to optimize scaling descisions. Note that the Autoscaler can only autotune the Autoscaler parameter restart.time. | 
| job.autoscaler.restart.time | No | 5m | The expected restart time that Amazon EMR on EKS uses until the operator can determine the actual restart time from previous scalings. | 
| job.autoscaler.restart.time-tracking.limit | No | 15m | The maximum observed restart time when job.autoscaler.restart.time-tracking.enabled is set to true. | 

The following is an example deployment spec you can use to try out rescale time estimation:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

To simulate backpressure, use the following deployment spec.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Upload the following Python script to your S3 bucket.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

To verify that rescale time estimation is working, make sure that `DEBUG` level logging of the Flink operator is enabled. The example below demonstrates how to update the helm chart file `values.yaml`. Then reinstall the updated helm chart and run your Flink job again.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Getthe name of your leader pod.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Run the following command to get the actual restart time used in metrics evaluations.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

You should see logs similar to the following. Note that only the first scaling uses ` job.autoscaler.restart.time`. Subsequent scalings use the observed restart time.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

The open source built-in Flink Autoscaler uses numerous metrics to make the best scaling decisions. However, the default values it uses for its calculations are meant to be applicable to most workloads and might not optimal for a given job. The autotuning feature added into the Amazon EMR on EKS version of the Flink Operator looks at historical trends observed over specific captured metrics and then accordingly tries to calculate the most optimal value tailored for the given job.


| Configuration | Required | Default | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | No | False | Indicates whether the Flink Autoscaler should automatically tune configurations over time to optimize autoscalers scaling descisions. Currently, the Autoscaler can only autotune the Autoscaler parameter restart.time. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | No | 3 | Indicates how many historical Amazon EMR on EKS metrics the Autoscaler keeps in the Amazon EMR on EKS metrics config map. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | No | 3 | Indicates how many number of restarts the Autoscaler performs before it starts calculating the average restart time for a given job. | 

To enable autotuning, you must have completed the following:
+ Set `kubernetes.operator.job.autoscaler.autotune.enable:` to `true`
+ Set `metrics.job.status.enable:` to `TOTAL_TIME`
+ Followed the setup of [Using Autoscaler for Flink applications](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) to enable Autoscaling.

The following is an example deployment spec you can use to try out autotuning.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

To simulate backpressure, use the following deployment spec.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Upload the following Python script to your S3 bucket.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

To verify that your autotuner is working, use the following commands. Note that you must use your own leader pod information for the Flink Operator.

First get the name of your leader pod.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Once you have the name of your leader pod, you can run the following command.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

You should see logs similar to the following.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Maintenance and troubleshooting for Flink jobs on Amazon EMR on EKS


The following sections outline how to maintain your long-running Flink jobs, and provide guidance on how to troubleshoot some common issues with Flink jobs.

# Maintaining Flink applications
Maintaining Flink applications

**Topics**
+ [

## Upgrade modes
](#jobruns-flink-upgrademode)

Flink applications are typically designed to run for long periods of time such as weeks, months, or even years. As with all long-running services, Flink streaming applications need to be maintained. This includes bug fixes, improvements, and migration to a Flink cluster of a later version.

When the spec changes for `FlinkDeployment` and `FlinkSessionJob` resources, you need to upgrade the running application. To do this, the operator stops the running job (unless already suspended) and redeploys it with the latest spec and, for stateful applications, the state from the previous run.

Users control how to manage the state when stateful applications stop and restore with the `upgradeMode` setting of the `JobSpec`.

## Upgrade modes
Upgrade modes

Optional introduction

**Stateless**  
Stateless application upgrades from empty state.

**Last state**  
Quick upgrades in any application state (even for failing jobs), does not require a healthy job as it always uses the latest successful checkpoint. Manual recovery may be necessary if HA metadata is lost. To limit the time the job may fall back when picking up the latest checkpoint you can configure `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`. If the checkpoint is older than the configured value, a savepoint will be taken instead for healthy jobs. This is not supported in Session mode. 

**Savepoint**  
Use savepoint for upgrade, providing maximal safety and possibility to serve as backup/fork point. The savepoint will be created during the upgrade process. Note that the Flink job needs to be running to allow the savepoint to get created. If the job is in an unhealthy state, the last checkpoint will be used (unless kubernetes.operator.job.upgrade.last-state-fallback.enabled is set to false). If the last checkpoint is not available, the job upgrade will fail.

# Troubleshooting
Troubleshooting

This section describes how to troubleshoot problems with Amazon EMR on EKS. For information on how to troubleshoot general problems with Amazon EMR, see [Troubleshoot a cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html) in the *Amazon EMR Management Guide*.
+ [Troubleshooting jobs that use PersistentVolumeClaims (PVC)](permissions-for-pvc.md)
+ [Troubleshooting Amazon EMR on EKS vertical autoscaling](troubleshooting-vas.md)
+ [Troubleshooting Amazon EMR on EKS Spark operator](troubleshooting-sparkop.md)

## Troubleshooting Apache Flink on Amazon EMR on EKS


### Resource mapping not found when installing the Helm chart


You might encounter the following error message when you install the Helm chart.

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

To resolve this error, install cert-manager to enable adding the webhook component. You must install cert-manager to each Amazon EKS cluster that you use.

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### AWS service access denied error


If you see an *access denied* error, confirm that the IAM role for `operatorExecutionRoleArn` in the Helm chart `values.yaml` file has the correct permissions. Also ensure the IAM role under `executionRoleArn` in your `FlinkDeployment` specification has the correct permissions.

### `FlinkDeployment` is stuck


If your `FlinkDeployment` stalls in an arrested state, use the following steps to force delete the deployment:

1. Edit the deployment run.

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. Remove this finalizer.

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. Delete the deployment.

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### s3a AWSBadRequestException issue when running a Flink application in an opt-in AWS Region


If you run a Flink application in an [opt-in AWS Region](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), you might see the following errors:

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

To fix these errors, use the following configuration in your `FlinkDeployment` definition file.

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

We also recommend that you use the SDKv2 credentials provider:

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

If you want to use the SDKv1 credentials provider, make sure that your SDK supports your opt-in Region. For more information, see the [aws-sdk-java GitHub repository](https://github.com/aws/aws-sdk-java).

If you get `S3 AWSBadRequestException` when you run Flink SQL statements in an opt-in Region, make sure that you set the configuration `fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME` in your flink configuration spec.

### S3A AWSBadRequestException when running a Flink session job in CN regions


For Amazon EMR releases 6.15.0 - 7.2.0, you might encounter the following error messages when you run a Flink session job in CN regions. These include China (Beijing) and China (Ningxia):

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

There is an awareness of this issue. The team is working on patching the flink operators for all of these release versions. However, before we finish the patch, to fix this error, you need to download the flink operator helm chart, untar it (extract the compressed file) and make configuration changes in the helm chart.

The specific steps are the following:

1. Change to, specifically change directories to, your local folder for the helm chart, and run the following command line to pull the helm chart and untar (extract) it.

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. Go into the helm chart folder and find the `templates/flink-operator.yaml` file.

1. Find the `flink-operator-config` ConfigMap and add the following `fs.s3a.endpoint.region` configuration in the `flink-conf.yaml`. For example:

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. Install the local helm chart and run your job.

# Supported releases for Amazon EMR on EKS with Apache Flink
Supported releases

Apache Flink is available with the following Amazon EMR on EKS releases. For information on all of the releases that are available, see [Amazon EMR on EKS releases](emr-eks-releases.md).


| Release label | Java | Flink | Flink operator | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 