

# Setting up and using the Flink Kubernetes operator
<a name="jobruns-flink-kubernetes-operator"></a>

The following pages describe how to set up and use the Flink Kubernetes operator to run Flink jobs with Amazon EMR on EKS. The topics available include required prerequisites, how to set up your environment, and running a Flink application on Amazon EMR on EKS.

**Topics**
+ [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md)
+ [Installing the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-getting-started.md)
+ [Run a Flink application](jobruns-flink-kubernetes-operator-run-application.md)
+ [Security role permissions for running a Flink application](jobruns-flink-kubernetes-security.md)
+ [Uninstalling the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-uninstall.md)

# Setting up the Flink Kubernetes operator for Amazon EMR on EKS
<a name="jobruns-flink-kubernetes-operator-setup"></a>

Complete the following tasks to get set up before you install the Flink Kubernetes operator on Amazon EKS. If you've already signed up for Amazon Web Services (AWS) and have used Amazon EKS, you are almost ready to use Amazon EMR on EKS. Complete the following tasks to get set up for the Flink operator on Amazon EKS. If you've already completed any of the prerequisites, you can skip those and move on to the next one.
+ **[Install or update to the latest version of the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) ** – If you've already installed the AWS CLI, confirm that you have the latest version.
+ **[Set up kubectl and eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** – eksctl is a command line tool that you use to communicate with Amazon EKS.
+ **[Install Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** – The Helm package manager for Kubernetes helps you install and manage applications on your Kubernetes cluster. 
+ **[Get started with Amazon EKS – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** – Follow the steps to create a new Kubernetes cluster with nodes in Amazon EKS.
+ **[Choose an Amazon EMR release label](jobruns-flink-security-release-versions.md) (release 6.13.0 or higher)** – the Flink Kubernetes operator is supported with Amazon EMR releases 6.13.0 and higher.
+ **[Enable IAM Roles for Service Accounts (IRSA) on the Amazon EKS cluster](setting-up-enable-IAM.md)**.
+ **[Create a job execution role](creating-job-execution-role.md)**.
+ **[Update the trust policy of the job execution role ](setting-up-trust-policy.md)**.
+ Create an operator execution role. This step is optional. You can use the same role for Flink jobs and operator. If you want to have a different IAM role for your operator, you can create a separate role.
+ Update the trust policy of the operator execution role. You must explicitly add one trust policy entry for the roles you want to use for the Amazon EMR Flink Kubernetes operator service account. You can follow this example format:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# Installing the Flink Kubernetes operator for Amazon EMR on EKS
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

This topic helps you start to use the Flink Kubernetes operator on Amazon EKS by preparing a Flink deployment.

## Install the Kubernetes operator
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

Use the following steps to install the Kubernetes operator for Apache Flink.

1. If you haven't already, complete the steps in [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md).

1. Install the *cert-manager* (once per Amazon EKS cluster) to enable adding the webhook component.

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. Install the Helm chart.

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   Example output:

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. Wait for the deployment to be complete and verify the chart installation.

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. You should see the following message when deployment is complete.

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. Use the following command to see the deployed operator.

   ```
   helm list --namespace $NAMESPACE
   ```

   The following shows example output, where the app version `x.y.z-amzn-n` would correspond with the Flink operator version for your Amazon EMR on EKS release. For more information, see [Supported releases for Amazon EMR on EKS with Apache Flink](jobruns-flink-security-release-versions.md).

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### Upgrade the Kubernetes operator
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

To upgrade the version of the Flink operator, follow these steps:

1. Uninstall the old `flink-kubernetes-operator`: `helm uninstall flink-kubernetes-operator -n <NAMESPACE>`.

1. Delete CRD (since helm will not automatically delete the old CRD): `kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`.

1. Re-install `flink-kubernetes-operator` with the newer version.

# Run a Flink application
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Application mode on Amazon EMR on EKS. With Amazon EMR 6.15.0 and higher, you can also run a Flink application in Session mode. This page describes both methods that you can use to run a Flink application with Amazon EMR on EKS.

**Topics**

**Note**  
You must have an Amazon S3 bucket created to store the high-availability metadata when you submit your Flink job. If you don’t want to use this feature, you can disable it. It's enabled by default.

**Prerequisite** – Before you can run a Flink application with the Flink Kubernetes operator, complete the steps in [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md) and [Install the Kubernetes operator](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator).

------
#### [ Application mode ]

With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Application mode on Amazon EMR on EKS.

1. Create a `FlinkDeployment` definition file `basic-example-app-cluster.yaml` like in the following example. If you activated and use one of the [opt-in AWS Regions](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), make sure you uncomment and configure the configuration `fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Submit the Flink deployment with the following command. This will also create a `FlinkDeployment` object named `basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Access the Flink UI.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Open `localhost:8081` to view your Flink jobs locally.

1. Clean up the job. Remember to clean up the S3 artifacts that were created for this job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch logs.

For more information on submitting applications to Flink through the Flink Kubernetes operator, see [ Flink Kubernetes operator examples ](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)in the `apache/flink-kubernetes-operator` folder on GitHub.

------
#### [ Session mode ]

With Amazon EMR 6.15.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Session mode on Amazon EMR on EKS.

1. Create a `FlinkDeployment` definition file named `basic-example-app-cluster.yaml` like in the following example. If you activated and use one of the [opt-in AWS Regions](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), make sure you uncomment and configure the configuration `fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Submit the Flink deployment with the following command. This will also create a `FlinkDeployment` object named `basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Use the following command to confirm that the session cluster `LIFECYCLE` is `STABLE`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   The output should be similar to the following example:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Create a `FlinkSessionJob` custom definition resource file `basic-session-job.yaml` with the following example content:

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Submit the Flink session job with the following command. This will create a `FlinkSessionJob` object `basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Use the following command to confirm that the session cluster `LIFECYCLE` is `STABLE`, and the `JOB STATUS` is `RUNNING`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   The output should be similar to the following example:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Access the Flink UI.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Open `localhost:8081` to view your Flink jobs locally.

1. Clean up the job. Remember to clean up the S3 artifacts that were created for this job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch logs.

------

# Security role permissions for running a Flink application
<a name="jobruns-flink-kubernetes-security"></a>

This topic describes security roles for deploying and running a Flink application. There are two roles required to manage a deployment and to create and manage jobs, the operator role and job role. This topic introduces them and lists their permissions.

## Role based access control
<a name="jobruns-flink-kubernetes-security-rbac"></a>

To deploy the operator and run Flink jobs, we must create two Kubernetes roles: one operator and one job role. Amazon EMR creates the two roles by default when you install the operator.

## Operator role
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

We use the operator role to manage `flinkdeployments` to create and manage the JobManager for each Flink job and other resources, like services.

The operator role's default name is `emr-containers-sa-flink-operator` and requires the following permissions.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## Job role
<a name="jobruns-flink-security-job-role"></a>

The JobManager uses the job role to create and manage TaskManagers and ConfigMaps for each job.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# Uninstalling the Flink Kubernetes operator for Amazon EMR on EKS
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

Follow these steps to uninstall the Flink Kubernetes operator.

1. Delete the operator.

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. Delete Kubernetes resources that Helm doesn’t uninstall.

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (Optional) Delete the cert-manager.

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```