

# Apache Flink settings
<a name="reference-flink-settings.title"></a>

Managed Service for Apache Flink is an implementation of the Apache Flink framework. Managed Service for Apache Flink uses the default values described in this section. Some of these values can be set by the Managed Service for Apache Flink applications in code, and others cannot be changed.

Use the links in this section to learn more about Apache flink settings and which ones are modifiable.

**Topics**
+ [Apache Flink configuration](#apache-flink-configuration)
+ [State backend](#reference-defaults-state-backend)
+ [Checkpointing](#reference-defaults-checkpoint)
+ [Savepointing](#reference-defaults-savepoint)
+ [Heap sizes](#reference-defaults-heap)
+ [Buffer debloating](#reference-defaults-buffer-debloating)
+ [Modifiable Flink configuration properties](reference-modifiable-settings.md)
+ [Programmatic Flink configuration properties](programmatic-configuration.md)
+ [View configured Flink properties](viewing-modifiable-settings.md)

## Apache Flink configuration
<a name="apache-flink-configuration"></a>

Managed Service for Apache Flink provides a default Flink configuration consisting of Apache Flink-recommended values for most properties and a few based on common application profiles. For more information about Flink configuration, see [Configuration](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). Service-provided default configuration works for most applications. However, to tweak Flink configuration properties to improve performance for certain applications with high parallelism, high memory and state usage, or enable new debugging features in Apache Flink, you can change certain properties by requesting a support case. For more information, see [AWS Support Center](https://console.aws.amazon.com/support/home#/). You can check the current configuration for your application using the [Apache Flink Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html).

## State backend
<a name="reference-defaults-state-backend"></a>

Managed Service for Apache Flink stores transient data in a state backend. Managed Service for Apache Flink uses the **RocksDBStateBackend**. Calling `setStateBackend` to set a different backend has no effect. 

We enable the following features on the state backend:
+ Incremental state backend snapshots
+ Asynchronous state backend snapshots
+ Local recovery of checkpoints

For more information about state backends, see [State Backends](https://nightlies.apache.org/flink/flink-docs-release-1.19/ops/state/state_backends.html) in the Apache Flink Documentation.

## Checkpointing
<a name="reference-defaults-checkpoint"></a>

Managed Service for Apache Flink uses a default checkpoint configuration with the following values. Some of these values can be changed using [CheckpointConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CheckpointConfiguration.html). You must set `CheckpointConfiguration.ConfigurationType` to `CUSTOM` for Managed Service for Apache Flink to use modified checkpointing values.


****  

| Setting | Can be modified? | How | Default Value | 
| --- | --- | --- | --- | 
| CheckpointingEnabled | Modifiable | [Create Application](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) [Update Application](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/AWS_KinesisAnalyticsV2.html)  | True | 
| CheckpointInterval | Modifiable | [Create Application](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) [Update Application](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/AWS_KinesisAnalyticsV2.html)  | 60000 | 
| MinPauseBetweenCheckpoints | Modifiable | [Create Application](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) [Update Application](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/AWS_KinesisAnalyticsV2.html)  | 5000 | 
| Unaligned checkpoints | Modifiable | [Support case](https://console.aws.amazon.com/support/home#/) | False | 
| Number of Concurrent Checkpoints | Not Modifiable | N/A | 1 | 
| Checkpointing Mode | Not Modifiable | N/A | Exactly Once | 
| Checkpoint Retention Policy | Not Modifiable | N/A | On Failure | 
| Checkpoint Timeout | Not Modifiable | N/A | 60 minutes | 
| Max Checkpoints Retained | Not Modifiable | N/A | 1 | 
| Checkpoint and Savepoint Location | Not Modifiable | N/A | We store durable checkpoint and savepoint data to a service-owned S3 bucket. | 

## Savepointing
<a name="reference-defaults-savepoint"></a>

By default, when restoring from a savepoint, the resume operation will try to map all state of the savepoint back to the program you are restoring with. If you dropped an operator, by default, restoring from a savepoint that has data that corresponds to the missing operator will fail. You can allow the operation to succeed by setting the *AllowNonRestoredState* parameter of the application's [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html) to `true`. This will allow the resume operation to skip state that cannot be mapped to the new program.

For more information, see [ Allowing Non-Restored State](https://nightlies.apache.org/flink/flink-docs-release-1.15/ops/state/savepoints.html#allowing-non-restored-state) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Heap sizes
<a name="reference-defaults-heap"></a>

Managed Service for Apache Flink allocates each KPU 3 GiB of JVM heap, and reserves 1 GiB for native code allocations. For information about increasing your application capacity, see [Implement application scaling in Managed Service for Apache Flink](how-scaling.md). 

For more information about JVM heap sizes, see [Configuration](https://nightlies.apache.org/flink/flink-docs-release-1.15/ops/config.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Buffer debloating
<a name="reference-defaults-buffer-debloating"></a>

Buffer debloating can help applications with high backpressure. If your application experiences failed checkpoints/savepoints, enabling this feature could be useful. To do this, request a [support case](https://console.aws.amazon.com/support/home#/). 

For more information, see [The Buffer Debloating Mechanism](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Modifiable Flink configuration properties
<a name="reference-modifiable-settings"></a>

Following are Flink configuration settings that you can modify using a [support case](https://console.aws.amazon.com/support/home#/). You can modify more than one property at a time, and for multiple applications at the same time by specifying the application prefix. If there are other Flink configuration properties outside this list you want to modify, specify the exact property in your case. 

## Restart strategy
<a name="reference-modifiable-settings-fault-tolerance"></a>

From Flink 1.19 and later, we use the `exponential-delay` restart strategy by default. All previous versions use the `fixed-delay` restart strategy by default.

`restart-strategy:`

`restart-strategy.fixed-delay.delay:`

`restart-strategy.exponential-delay.backoff-muliplier:`

`restart-strategy.exponential-delay.initial-backoff:`

`restart-strategy.exponential-delay.jitter-factor:`

`restart-strategy.exponential-delay.reset-backoff-threshold:`

## Checkpoints and state backends
<a name="reference-modifiable-settings-checkpoints-state-backends"></a>

`state.backend.type:`

`state.backend.fs.memory-threshold:`

`state.backend.incremental:`

## Checkpointing
<a name="reference-modifiable-settings-checkpointing"></a>

`execution.checkpointing.unaligned:`

`execution.checkpointing.interval-during-backlog:`

## RocksDB native metrics
<a name="reference-modifiable-settings-rocksdb"></a>

RocksDB Native Metrics are not shipped to CloudWatch. Once enabled, these metrics can be accessed either from the Flink dashboard or the Flink REST API with custom tooling.

Managed Service for Apache Flink enables customers to access the latest Flink [REST API](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/) (or the supported version you are using) in read-only mode using the [CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html) API. This API is used by Flink’s own dashboard, but it can also be used by custom monitoring tools. 

`state.backend.rocksdb.metrics.actual-delayed-write-rate:`

`state.backend.rocksdb.metrics.background-errors:`

`state.backend.rocksdb.metrics.block-cache-capacity:`

`state.backend.rocksdb.metrics.block-cache-pinned-usage:`

`state.backend.rocksdb.metrics.block-cache-usage:`

`state.backend.rocksdb.metrics.column-family-as-variable:`

`state.backend.rocksdb.metrics.compaction-pending:`

`state.backend.rocksdb.metrics.cur-size-active-mem-table:`

`state.backend.rocksdb.metrics.cur-size-all-mem-tables:`

`state.backend.rocksdb.metrics.estimate-live-data-size:`

`state.backend.rocksdb.metrics.estimate-num-keys:`

`state.backend.rocksdb.metrics.estimate-pending-compaction-bytes:`

`state.backend.rocksdb.metrics.estimate-table-readers-mem:`

`state.backend.rocksdb.metrics.is-write-stopped:`

`state.backend.rocksdb.metrics.mem-table-flush-pending:`

`state.backend.rocksdb.metrics.num-deletes-active-mem-table:`

`state.backend.rocksdb.metrics.num-deletes-imm-mem-tables:`

`state.backend.rocksdb.metrics.num-entries-active-mem-table:`

`state.backend.rocksdb.metrics.num-entries-imm-mem-tables:`

`state.backend.rocksdb.metrics.num-immutable-mem-table:`

`state.backend.rocksdb.metrics.num-live-versions:`

`state.backend.rocksdb.metrics.num-running-compactions:`

`state.backend.rocksdb.metrics.num-running-flushes:`

`state.backend.rocksdb.metrics.num-snapshots:`

`state.backend.rocksdb.metrics.size-all-mem-tables:`

## RocksDB options
<a name="reference-modifiable-settings-rocksdb-options"></a>

`state.backend.rocksdb.compaction.style:`

`state.backend.rocksdb.memory.partitioned-index-filters:`

`state.backend.rocksdb.thread.num:`

## Advanced state backends options
<a name="reference-modifiable-settings-advanced-state-backends-options"></a>

`state.storage.fs.memory-threshold:`

## Full TaskManager options
<a name="reference-modifiable-settings-full-task-manager-options"></a>

`task.cancellation.timeout:`

`taskmanager.jvm-exit-on-oom:`

`taskmanager.numberOfTaskSlots:`

`taskmanager.slot.timeout:`

`taskmanager.memory.network.fraction:`

`taskmanager.memory.network.max:`

`taskmanager.network.request-backoff.initial:`

`taskmanager.network.request-backoff.max:`

`taskmanager.network.memory.buffer-debloat.enabled:`

`taskmanager.network.memory.buffer-debloat.period:`

`taskmanager.network.memory.buffer-debloat.samples:`

`taskmanager.network.memory.buffer-debloat.threshold-percentages:`

## Memory configuration
<a name="reference-modifiable-settings-memory-configuration"></a>

`taskmanager.memory.jvm-metaspace.size:`

`taskmanager.memory.jvm-overhead.fraction:`

`taskmanager.memory.jvm-overhead.max:`

`taskmanager.memory.managed.consumer-weights:`

`taskmanager.memory.managed.fraction:`

`taskmanager.memory.network.fraction:`

`taskmanager.memory.network.max:`

`taskmanager.memory.segment-size:`

`taskmanager.memory.task.off-heap.size:`

## RPC / Akka
<a name="reference-modifiable-settings-RPC-Akka"></a>

`akka.ask.timeout:`

`akka.client.timeout:`

`akka.framesize:`

`akka.lookup.timeout:`

`akka.tcp.timeout:`

## Client
<a name="reference-modifiable-settings-client"></a>

`client.timeout:`

## Advanced cluster options
<a name="reference-modifiable-settings-advanced-cluster-options"></a>

`cluster.intercept-user-system-exit:`

`cluster.processes.halt-on-fatal-error:`

## Filesystem configurations
<a name="reference-modifiable-settings-advanced-filesystem-configurations"></a>

`fs.s3.connection.maximum:`

`fs.s3a.connection.maximum:`

`fs.s3a.threads.max:`

`s3.upload.max.concurrent.uploads:`

## Advanced fault tolerance options
<a name="reference-modifiable-settings-advanced-fault-tolerance-options"></a>

`heartbeat.timeout:`

`jobmanager.execution.failover-strategy:`

## Memory configuration
<a name="reference-modifiable-settings-memory-configuration"></a>

`jobmanager.memory.heap.size:`

## Metrics
<a name="reference-modifiable-settings-metrics"></a>

`metrics.latency.interval:`

## Advanced options for the REST endpoint and client
<a name="reference-modifiable-settings-rest"></a>

`rest.flamegraph.enabled:`

`rest.server.numThreads:`

## Advanced SSL security options
<a name="reference-modifiable-settings-ssl"></a>

`security.ssl.internal.handshake-timeout:`

## Advanced scheduling options
<a name="reference-modifiable-settings-scheduling"></a>

`slot.request.timeout:`

## Advanced options for Flink web UI
<a name="reference-modifiable-settings-webui"></a>

`web.timeout:`

# Programmatic Flink configuration properties
<a name="programmatic-configuration"></a>

Following are Flink configuration properties that you can modify directly in your application code. Starting with MSF 2.2, an exception will be thrown if you try to modify a property not listed on this page in your application code.

**Note**  
These properties are distinct from settings modified via AWS Support case. For infrastructure-level settings such as TaskManager memory, state backend, RocksDB tuning, and restart strategies, see [Modifiable Flink configuration properties](reference-modifiable-settings.md).

## Pipeline configuration
<a name="programmatic-configuration-pipeline"></a>

`execution.runtime-mode`

`pipeline.auto-generate-uids`

`pipeline.auto-watermark-interval`

`pipeline.cached-files`

`pipeline.classpaths`

`pipeline.closure-cleaner-level`

`pipeline.force-avro`

`pipeline.force-kryo`

`pipeline.generic-types`

`pipeline.global-job-parameters`

`pipeline.jars`

`pipeline.jobvertex-parallelism-overrides`

`pipeline.max-parallelism`

`pipeline.name`

`pipeline.object-reuse`

`pipeline.operator-chaining.chain-operators-with-different-max-parallelism`

`pipeline.operator-chaining.enabled`

`pipeline.serialization-config`

`pipeline.vertex-description-mode`

`pipeline.vertex-name-include-index-prefix`

`pipeline.watermark-alignment.allow-unaligned-source-splits`

## Python API
<a name="programmatic-configuration-python"></a>

`python.execution-mode`

`python.operator-chaining.enabled`

`python.job-options`

`python.internal.archives-key-map`

`python.internal.files-key-map`

`python.internal.requirements-file-key`

## Table API / SQL
<a name="programmatic-configuration-table-api"></a>

`table.exec.async-lookup.buffer-capacity`

`table.exec.async-lookup.key-ordered-enabled`

`table.exec.async-lookup.output-mode`

`table.exec.async-lookup.timeout`

`table.exec.async-ml-predict.max-concurrent-operations`

`table.exec.async-ml-predict.output-mode`

`table.exec.async-ml-predict.timeout`

`table.exec.async-scalar.max-concurrent-operations`

`table.exec.async-scalar.max-attempts`

`table.exec.async-scalar.retry-delay`

`table.exec.async-scalar.retry-strategy`

`table.exec.async-scalar.timeout`

`table.exec.async-state.enabled`

`table.exec.async-table.max-concurrent-operations`

`table.exec.async-table.max-retries`

`table.exec.async-table.retry-delay`

`table.exec.async-table.retry-strategy`

`table.exec.async-table.timeout`

`table.exec.async-vector-search.max-concurrent-operations`

`table.exec.async-vector-search.output-mode`

`table.exec.async-vector-search.timeout`

`table.exec.deduplicate.insert-update-after-sensitive-enabled`

`table.exec.deduplicate.mini-batch.compact-changes-enabled`

`table.exec.delta-join.cache-enabled`

`table.exec.disabled-operators`

`table.exec.iceberg.use-v2-sink`

`table.exec.interval-join.min-cleanup-interval`

`table.exec.legacy-cast-behaviour`

`table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold`

`table.exec.local-hash-agg.adaptive.enabled`

`table.exec.local-hash-agg.adaptive.sampling-threshold`

`table.exec.mini-batch.allow-latency`

`table.exec.mini-batch.enabled`

`table.exec.mini-batch.size`

`table.exec.operator-fusion-codegen.enabled`

`table.exec.simplify-operator-name-enabled`

`table.exec.sink.keyed-shuffle`

`table.exec.sink.nested-constraint-enforcer`

`table.exec.sink.not-null-enforcer`

`table.exec.sink.rowtime-inserter`

`table.exec.sink.type-length-enforcer`

`table.exec.sink.upsert-materialize`

`table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high`

`table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low`

`table.exec.sink.upsert-materialize-strategy.type`

`table.exec.sort.async-merge-enabled`

`table.exec.sort.default-limit`

`table.exec.sort.max-num-file-handles`

`table.exec.source.cdc-events-duplicate`

`table.exec.source.idle-timeout`

`table.exec.spill-compression.block-size`

`table.exec.spill-compression.enabled`

`table.exec.state.ttl`

`table.exec.uid.format`

`table.exec.uid.generation`

`table.exec.unbounded-over.version`

`table.exec.window-agg.buffer-size-limit`

`table.optimizer.agg-phase-strategy`

`table.optimizer.adaptive-broadcast-join.strategy`

`table.optimizer.bushy-join-reorder-threshold`

`table.optimizer.delta-join.strategy`

`table.optimizer.distinct-agg.split.bucket-num`

`table.optimizer.distinct-agg.split.enabled`

`table.optimizer.dynamic-filtering.enabled`

`table.optimizer.incremental-agg-enabled`

`table.optimizer.join-reorder-enabled`

`table.optimizer.multi-join.enabled`

`table.optimizer.multiple-input-enabled`

`table.optimizer.non-deterministic-update.strategy`

`table.optimizer.reuse-optimize-block-with-digest-enabled`

`table.optimizer.reuse-sink-enabled`

`table.optimizer.reuse-source-enabled`

`table.optimizer.reuse-sub-plan-enabled`

`table.optimizer.runtime-filter.enabled`

`table.optimizer.skewed-join-optimization.skewed-factor`

`table.optimizer.skewed-join-optimization.skewed-threshold`

`table.optimizer.skewed-join-optimization.strategy`

`table.optimizer.source.report-statistics-enabled`

`table.optimizer.union-all-as-breakpoint-enabled`

`table.builtin-catalog-name`

`table.builtin-database-name`

`table.catalog-modification.listeners`

`table.column-expansion-strategy`

`table.display.max-column-width`

`table.dml-sync`

`table.dynamic-table-options.enabled`

`table.generated-code.max-length`

`table.legacy-nested-row-nullability`

`table.local-time-zone`

`table.plan.compile.catalog-objects`

`table.plan.force-recompile`

`table.plan.restore.catalog-objects`

`table.rtas-ctas.atomicity-enabled`

`table.sql-dialect`

# View configured Flink properties
<a name="viewing-modifiable-settings"></a>

You can view Apache Flink properties you have configured yourself or requested to be modified through a [support case](https://support.console.aws.amazon.com/support/home#/) via the Apache Flink Dashboard and following these steps:

1. Go to the Flink Dashboard

1. Choose **Job Manager** in the left-hand side navigation pane.

1. Choose **Configuration** to view the list of Flink properties.