

# Using Autoscaler for Flink applications
Using Autoscaler

The operator autoscaler can help ease backpressure by collecting metrics from Flink jobs and automatically adjusting parallelism on a job vertex level. The following is an example of what your configuration might look like:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

This configuration uses default values for the latest release of Amazon EMR. If you use other versions, you might have different values.

**Note**  
As of Amazon EMR 7.2.0, you don't need to include the prefix `kubernetes.operator` in your configuration. If you use 7.1.0 or lower, you must use the prefix before each configuration. For example, you must specify `kubernetes.operator.job.autoscaler.scaling.enabled`.

The following are configuration options for the autoscaler.
+ `job.autoscaler.scaling.enabled` – specifies whether to enable vertex scaling execution by the autoscaler. The default is `true`. If you disable this configuration, the autoscaler only collects metrics and evaluates the suggested parallelism for each vertex but doesn't upgrade the jobs.
+ `job.autoscaler.stabilization.interval` – the stabilization period in which no new scaling will be executed. Default is 5 minutes.
+ `job.autoscaler.metrics.window` – the scaling metrics aggregation window size. The larger the window, the more smooth and stability, but the autoscaler might be slower to react to sudden load changes. Default is 15 minutes. We recommend you experiment by using a value between 3 to 60 minutes.
+ `job.autoscaler.target.utilization` – the target vertex utilization to provide stable job performance and some buffer for load fluctuations. The default is `0.7` targeting 70% utilization/load for the job vertexes.
+ `job.autoscaler.target.utilization.boundary` – the target vertex utilization boundary that serves as extra buffer to avoid immediate scaling on load fluctuations. Default is `0.3`, which means 30% deviation from the target utilization is allowed before triggering a scaling action.
+ `ob.autoscaler.restart.time` – the expected time to restart the application. Default is 5 minutes.
+ `job.autoscaler.catch-up.duration` – the expected time to catch up, meaning fully processing any backlog after a scaling operation completes. Default is 5 minutes. By lowering the catch-up duration, the autoscaler haves to reserve more extra capacity for the scaling actions.
+ `pipeline.max-parallelism` – the maximum parallelism the autoscaler can use. The autoscaler ignores this limit if it is higher than the max parallelism configured in the Flink config or directly on each operator. Default is -1. Note that the autoscaler computes the parallelism as a divisor of the max parallelism number therefore it is recommended to choose max parallelism settings that have a lot of divisors instead of relying on the Flink provided defaults. We recommend using multiples of 60 for this configuration, such as 120, 180, 240, 360, 720 etc.

For a more detailed configuration reference page, see [ Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Autoscaler parameter autotuning


This section describes auto-tuning behavior for various Amazon EMR versions. It also goes into detail regarding different auto-scaling configurations.

**Note**  
Amazon EMR 7.2.0 and higher uses the open source configuration `job.autoscaler.restart.time-tracking.enabled` to enable **rescale time estimation**. Rescale time estimation has the same functionality as Amazon EMR autotuning, so you don't have to manually assign empirical values to the restart time.  
You can still use Amazon EMR autotuning if you're using Amazon EMR 7.1.0 or lower.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 and higher measures the actual required restart time to apply autoscaling decisions. In releases 7.1.0 and lower, you had to use the configuration `job.autoscaler.restart.time` to manually configure estimated maximum restart time. By using the configuration `job.autoscaler.restart.time-tracking.enabled`, you only need to enter a restart time for the first scaling. Afterwards, the operator records the actual restart time and will use it for subsequent scalings.

To enable this tracking, use the following command:

```
job.autoscaler.restart.time-tracking.enabled: true
```

The following are the related configurations for rescale time estimation.


| Configuration | Required | Default | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | No | False | Indicates whether the Flink Autoscaler should automatically tune configurations over time to optimize scaling descisions. Note that the Autoscaler can only autotune the Autoscaler parameter restart.time. | 
| job.autoscaler.restart.time | No | 5m | The expected restart time that Amazon EMR on EKS uses until the operator can determine the actual restart time from previous scalings. | 
| job.autoscaler.restart.time-tracking.limit | No | 15m | The maximum observed restart time when job.autoscaler.restart.time-tracking.enabled is set to true. | 

The following is an example deployment spec you can use to try out rescale time estimation:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

To simulate backpressure, use the following deployment spec.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Upload the following Python script to your S3 bucket.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

To verify that rescale time estimation is working, make sure that `DEBUG` level logging of the Flink operator is enabled. The example below demonstrates how to update the helm chart file `values.yaml`. Then reinstall the updated helm chart and run your Flink job again.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Getthe name of your leader pod.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Run the following command to get the actual restart time used in metrics evaluations.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

You should see logs similar to the following. Note that only the first scaling uses ` job.autoscaler.restart.time`. Subsequent scalings use the observed restart time.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

The open source built-in Flink Autoscaler uses numerous metrics to make the best scaling decisions. However, the default values it uses for its calculations are meant to be applicable to most workloads and might not optimal for a given job. The autotuning feature added into the Amazon EMR on EKS version of the Flink Operator looks at historical trends observed over specific captured metrics and then accordingly tries to calculate the most optimal value tailored for the given job.


| Configuration | Required | Default | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | No | False | Indicates whether the Flink Autoscaler should automatically tune configurations over time to optimize autoscalers scaling descisions. Currently, the Autoscaler can only autotune the Autoscaler parameter restart.time. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | No | 3 | Indicates how many historical Amazon EMR on EKS metrics the Autoscaler keeps in the Amazon EMR on EKS metrics config map. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | No | 3 | Indicates how many number of restarts the Autoscaler performs before it starts calculating the average restart time for a given job. | 

To enable autotuning, you must have completed the following:
+ Set `kubernetes.operator.job.autoscaler.autotune.enable:` to `true`
+ Set `metrics.job.status.enable:` to `TOTAL_TIME`
+ Followed the setup of [Using Autoscaler for Flink applications](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) to enable Autoscaling.

The following is an example deployment spec you can use to try out autotuning.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

To simulate backpressure, use the following deployment spec.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Upload the following Python script to your S3 bucket.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

To verify that your autotuner is working, use the following commands. Note that you must use your own leader pod information for the Flink Operator.

First get the name of your leader pod.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Once you have the name of your leader pod, you can run the following command.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

You should see logs similar to the following.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------