

# How Flink supports high availability and job resiliency
<a name="jobruns-flink-resiliency"></a>

The following sections outline how Flink makes jobs more reliable and highly available. It does this through built-in capabilities like Flink high availability and various recovery capabilities if failures occur.

**Topics**
+ [Using high availability (HA) for Flink Operators and Flink Applications](jobruns-flink-using-ha.md)
+ [Optimizing Flink job restart times for task recovery and scaling operations with Amazon EMR on EKS](jobruns-flink-restart.md)
+ [Graceful decommission of Spot Instances with Flink on Amazon EMR on EKS](jobruns-flink-decommission.md)

# Using high availability (HA) for Flink Operators and Flink Applications
<a name="jobruns-flink-using-ha"></a>

This topic shows how to configure high availability and describes how it works for a few different use cases. These include when you're using the Job manager and when you're using Flink native kubernetes.

## Flink operator high-availability
<a name="jobruns-flink-ha-operator"></a>

We enable *high availability* for the Flink Operator so that we can fail-over to a standby Flink Operator to minimize downtime in the operator control loop if failures occur. High availability is enabled by default and the default number of starting operator replicas is 2. You can configure the replicas field in your `values.yaml` file for the helm chart.

The following fields are customizable:
+ `replicas` (optional, default is 2): Setting this number to greater than 1 creates other standby Operators and allows for faster recovery of your job.
+ `highAvailabilityEnabled` (optional, default is true): Controls whether you want to enable HA. Specifying this parameter as true enables multi AZ deployment support, as well as sets the correct `flink-conf.yaml` parameters.

You can disable HA for your operator by setting the following configuration in your `values.yaml` file.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Multi AZ deployment**

We create the operator pods in multiple Availability Zones. This is a soft constraint, and your operator pods will be scheduled in the same AZ if you don't have enough resources in a different AZ.

**Determining the leader replica**

 If HA is enabled, the replicas use a lease to determine which of the JMs is the leader and uses a K8s Lease for leader election. You can describe the Lease and look at the .Spec.Holder Identity field to determine the current leader

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 Interaction**

**Configuring access credentials**

Please make sure that you have configured IRSA with appropriate IAM permissions to access the S3 bucket.

**Fetching job jars from S3 Application mode**

The Flink operator also supports fetching applications jars from S3. You just provide the S3 location for the jarURI in your FlinkDeployment specification.

You can also use this feature to download other artifacts like PyFlink scripts. The resulting Python script is dropped under the path `/opt/flink/usrlib/`.

The following example demonstrates how to use this feature for a PyFlink job. Note the jarURI and args fields.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 Connectors**

Flink comes packaged with two S3 connectors (listed below). The following sections discuss when to use which connector.

**Checkpointing: Presto S3 connector**
+ Set S3 scheme to s3p://
+ The recommended connector to use to checkpoint to s3. For more information, see [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) in the Apache Flink documentation.

Example FlinkDeployment specification:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Reading and writing to S3: Hadoop S3 connector**
+ Set S3 scheme to `s3://` or ( `s3a://` )
+ The recommended connector for reading and writing files from S3 (only S3 connector that implements the [ Flinks Filesystem interface](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ By default, we set `fs.s3a.aws.credentials.provider` in the `flink-conf.yaml` file, which is `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. If you override the d efault `flink-conf` completely and you are interacting with S3, make sure to use this provider.

Example FlinkDeployment spec

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

High Availability (HA) for Flink Deployments allow jobs to continue making progress even if a transient error is encountered and your JobManager crashes. The jobs will restart but from the last successful checkpoint with HA enabled. Without HA enabled, Kubernetes will restart your JobManager, but your job will start as a fresh job and will lose its progress. After configuring HA, we can tell Kubernetes to store the HA metadata in a persistent storage to reference in case of a transient failure in the JobManager and then resume our jobs from the last successful checkpoint.

HA is enabled by default for your Flink jobs (the replica count is set to 2, which will require you to provide an S3 storage location for HA metadata to persist).

**HA configs**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

The following are descriptions for the above HA configs in Job Manager (defined under .spec.jobManager):
+ `highAvailabilityEnabled` (optional, default is true): Set this to `false ` if you don't want HA enabled and don’t want to use the provided HA configurations. You can still manipulate the "replicas" field to manually configure HA.
+ `replicas` (optional, default is 2): Setting this number to greater than 1 creates other standby JobManagers and allows for faster recovery of your job. If you disable HA, you must set replica count to 1, or you will keep getting validation errors (only 1 replica is supported if HA is not enabled).
+ `storageDir` (required): Because we use replica count as 2 by default, we have to provide a persistent storageDir. Currently this field only accepts S3 paths as the storage location.

**Pod locality**

 If you enable HA, we also try to collocate pods in the same AZ, which leads to improved performance (reduced network latency by having pods in same AZs). This is a best-effort process, meaning if you don't have enough resources in the AZ where the majority of your Pods are scheduled, the remaining Pods will still be scheduled but might end up on a node outside of this AZ.

**Determining the leader replica**

If HA is enabled, the replicas use a lease to determine which of the JMs is the leader and uses a K8s Configmap as the datastore to store this metadata. If you want to determine the leader, you can look at the content of the Configmap and look at the key `org.apache.flink.k8s.leader.restserver` under data to find the K8s pod with the IP address. You can also use the following bash commands.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink job - native Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 and higher supports Flink native Kubernetes for running Flink applications in high-availability mode on an Amazon EKS cluster. 

**Note**  
You must have an Amazon S3 bucket created to store the high-availability metadata when you submit your Flink job. If you don’t want to use this feature, you can disable it. It's enabled by default.

To turn on the Flink high-availability feature, provide the following Flink parameters when you [run the `run-application` CLI command](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). The parameters are defined below the example.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – The Amazon S3 bucket where you want to store the high-availability metadata for your job.

  **`Dkubernetes.jobmanager.replicas`** – The number of Job Manager pods to create as an integer greater than `1`.

  **`Dkubernetes.cluster-id`** – A unique ID that identifies the Flink cluster.

# Optimizing Flink job restart times for task recovery and scaling operations with Amazon EMR on EKS
<a name="jobruns-flink-restart"></a>

When a task fails or when a scaling operation occurs, Flink attempts to re-execute the task from the last completed checkpoint. The restart process could take a minute or longer to execute, depending on the size of the checkpoint state and the number of parallel tasks. During the restart period, backlog tasks can accumulate for the job. There are some ways though, that Flink optimizes the speed of recovery and restart of execution graphs to improve job stability.

This page describes some of the ways that Amazon EMR Flink can improve the job restart time during task recovery or scaling operations on spot instances. Spot instances are unused compute capacity that's available at a discount. It has unique behaviors, including occasional interruptions, so it's important to understand how Amazon EMR on EKS handles these, including how Amazon EMR on EKS carries out decommissioning and job restarts.

**Topics**
+ [Task-local recovery](#flink-restart-task-local)
+ [Task-local recovery by Amazon EBS volume mount](#flink-restart-task-local-ebs)
+ [Generic log-based incremental checkpoint](#flink-restart-log-check)
+ [Fine-grained recovery](#flink-restart-fine-grained)
+ [Combined restart mechanism in adaptive scheduler](#flink-restart-combined)

## Task-local recovery
<a name="flink-restart-task-local"></a>

**Note**  
Task-local recovery is supported with Flink on Amazon EMR on EKS 6.14.0 and higher.

With Flink checkpoints, each task produces a snapshot of its state that Flink writes to distributed storage like Amazon S3. In cases of recovery, the tasks restore their state from the distributed storage. Distributed storage provides fault tolerance and can redistribute the state during rescaling because it's accessible to all nodes.

However, a remote distributed store also has a disadvantage: all tasks must read their state from a remote location over the network. This can result in long recovery times for large states during task recovery or scaling operations.

This problem of long recovery time is solved by *task-local recovery*. Tasks write their state on checkpoint into a secondary storage that is local to the task, such as on a local disk. They also store their state in the primary storage, or Amazon S3 in our case. During recovery, the scheduler schedules the tasks on the same Task Manager where the tasks ran earlier so that they can recover from the local state store instead of reading from the remote state store. For more information, see [Task-Local Recovery](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) in the *Apache Flink Documentation*.

Our benchmark tests with sample jobs have shown that the recovery time has been reduced from minutes to a few seconds with task-local recovery enabled.

To enable task-local recovery, set the following configurations in your `flink-conf.yaml` file. Specify the checkpointing interval value in milliseconds.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Task-local recovery by Amazon EBS volume mount
<a name="flink-restart-task-local-ebs"></a>

**Note**  
Task-local recovery by Amazon EBS is supported with Flink on Amazon EMR on EKS 6.15.0 and higher.

With Flink on Amazon EMR on EKS, you can automatically provision Amazon EBS volumes to the TaskManager pods for task local recovery. The default overlay mount comes with 10 GB volume, which is sufficient for jobs with a lower state. Jobs with large states can enable the *automatic EBS volume mount* option. The TaskManager pods are automatically created and mounted during pod creation and removed during pod deletion.

Use the following steps to enable automatic EBS volume mount for Flink in Amazon EMR on EKS:

1. Export the values for the following variables that you'll use in upcoming steps.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Create or update a `kubeconfig` YAML file for your cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Create an IAM service account for the Amazon EBS Container Storage Interface (CSI) driver on your Amazon EKS cluster. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Create the Amazon EBS CSI driver with the following command:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Create the Amazon EBS storage class with the following command:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   And then apply the class:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm install the Amazon EMR Flink Kubernetes operator with options to create a service account. This creates the `emr-containers-sa-flink` to use in the Flink deployment.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. To submit the Flink job and enable the automatic provision of EBS volumes for task-local recovery, set the following configurations in your `flink-conf.yaml` file. Adjust the size limit for the state size of the job. Set `serviceAccount` to `emr-containers-sa-flink`. Specify the checkpointing interval value in milliseconds. And omit the `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

When you're ready to delete the Amazon EBS CSI driver plugin, use the following commands:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Generic log-based incremental checkpoint
<a name="flink-restart-log-check"></a>

**Note**  
Generic log-based incremental checkpointing is supported with Flink on Amazon EMR on EKS 6.14.0 and higher.

Generic log-based incremental checkpointing was added in Flink 1.16 to improve the speed of checkpoints. A faster checkpoint interval often results in a reduction of recovery work because fewer events need to be reprocessed after recovery. For more information, see [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) on the *Apache Flink Blog*.

With sample jobs, our benchmark tests have shown that the checkpoint time reduced from minutes to a few seconds with the generic log-based incremental checkpoint.

To enable generic log-based incremental checkpoints, set the following configurations in your `flink-conf.yaml` file. Specify the checkpointing interval value in milliseconds.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Fine-grained recovery
<a name="flink-restart-fine-grained"></a>

**Note**  
Fine-grained recovery support for the default scheduler is supported with Flink on Amazon EMR on EKS 6.14.0 and higher. Fine-grained recovery support in the adaptive scheduler is available with Flink on Amazon EMR on EKS 6.15.0 and higher.

When a task fails during execution, Flink resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks. Fine-grained recovery restarts only the pipeline-connected component of the failed task. In the following example, the job graph has 5 vertices (`A` to `E`). All connections between the vertices are pipelined with pointwise distribution, and the `parallelism.default` for the job is set to `2`. 

```
A → B → C → D → E
```

For this example, there are a total of 10 tasks running. The first pipeline (`a1` to `e1`) runs on a TaskManager (`TM1`), and the second pipeline (`a2` to `e2`) runs on another TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

There are two pipelined connected components: `a1 → e1`, and `a2 → e2`. If either `TM1` or `TM2` fails, the failure impacts only the 5 tasks in the pipeline where the TaskManager was running. The restart strategy only starts the affected pipelined component. 

Fine-grained recovery works only with perfectly parallel Flink jobs. It's not supported with `keyBy()` or `redistribute()` operations. For more information, see [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) in the *Flink Improvement Proposal* Jira project.

To enable fine-grained recovery, set the following configurations in your `flink-conf.yaml` file.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Combined restart mechanism in adaptive scheduler
<a name="flink-restart-combined"></a>

**Note**  
The combined restart mechanism in adaptive scheduler is supported with Flink on Amazon EMR on EKS 6.15.0 and higher.

Adaptive scheduler can adjust the parallelism of the job based on available slots. It automatically reduces the parallelism if not enough slots are available to fit the configured job parallelism. If new slots become available, the job is scaled up again to the configured job parallelism. An adaptive scheduler avoids downtime on the job when there are not enough resources available. This is the supported scheduler for Flink Autoscaler. We recommend adaptive scheduler with Amazon EMR Flink for these reasons. However, adaptive schedulers might do multiple restarts within a short period of time, one restart for every new resource added. This could lead to a performance drop in the job.

With Amazon EMR 6.15.0 and higher, Flink has a combined restart mechanism in adaptive scheduler that opens a restart window when the first resource is added, and then waits until the configured window interval of the default 1 minute. It performs a single restart when there are sufficient resources available to run the job with configured parallelism or when the interval times out.

With sample jobs, our benchmark tests have shown that this feature processes 10% of records more than the default behavior when you use adaptive scheduler and Flink autoscaler.

To enable the combined restart mechanism, set the following configurations in your `flink-conf.yaml` file.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Graceful decommission of Spot Instances with Flink on Amazon EMR on EKS
<a name="jobruns-flink-decommission"></a>

Flink with Amazon EMR on EKS can improve the job restart time during task recovery or scaling operations.

## Overview
<a name="jobruns-flink-decommission-overview"></a>

Amazon EMR on EKS releases 6.15.0 and higher support graceful decommission of Task Managers on Spot Instances in Amazon EMR on EKS with Apache Flink. As part of this feature, Amazon EMR on EKS with Flink provides the following capabilities:
+ **Just-in-time checkpointing** – Flink streaming jobs can respond to Spot Instance interruption, perform just-in-time (JIT) checkpoint of the running jobs, and prevent scheduling of additional tasks on these Spot Instances. JIT checkpoint is supported with default and adaptive scheduler.
+ **Combined restart mechanism** – A combined restart mechanism makes a best-effort attempt to restart the job after it reaches target resource parallelism or the end of the current configured window. This also prevents consecutive job restarts that might be caused by multiple Spot Instance terminations. Combined restart mechanism is available with adaptive scheduler only.

These capabilities provide the following benefits:
+ You can leverage Spot Instances to run Task Managers and reduce cluster expenditure.
+ Improved liveness for Spot Instance Task Manager results in higher resilience and more efficient job scheduling.
+ Your Flink jobs will have more uptime because there will be less restarts from Spot Instance termination.

## How graceful decommissioning works
<a name="jobruns-flink-decommission-howitworks"></a>

Consider the following example: you provision an Amazon EMR on EKS cluster running Apache Flink, and you specify On-Demand nodes for Job Manager, and Spot Instance nodes for Task Manager. Two minutes before termination, Task Manager receives an interruption notice.

In this scenario, the Job Manager would handle the Spot Instance interruption signal, block scheduling of additional tasks on the Spot Instance, and initiate JIT checkpointing for the streaming job.

Then, the Job Manager would restart the job graph only after there is sufficient availability of new resources to satisfy current job parallelism in the current restart interval window. The restart window interval is decided on the basis of Spot Instance replacement duration, creation of new Task Manager pods, and registration with Job Manager.

## Prerequisites
<a name="jobruns-flink-decommission-prereqs"></a>

To use graceful decommisioning, create and run a streaming job on an Amazon EMR on EKS cluster running Apache Flink. Enable Adaptive Scheduler and Task Managers scheduled on at least one Spot Instance, as shown in the following example. You should use On-Demand nodes for Job Manager, and you can use On-Demand nodes for Task Managers as long as there's at least one Spot Instance, too.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuration
<a name="jobruns-flink-decommission-config"></a>

This section covers most of the configurations that you can specify for your decommissioning needs. 


| Key | Description | Default value | Acceptable values | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Enable graceful decommission of Task Manager.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Enable combined restart mechanism in Adaptive Scheduler.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  The combined restart window interval to perfom merged restarts for the job. An integer without a unit is interpreted as milliseconds.  |  1m  |  Examples: 30, 60s, 3m, 1h  | 