

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Apache Flink
<a name="emr-flink"></a>

[Apache Flink](https://flink.apache.org/) 是一个流式处理数据流引擎，您可以使用此引擎在高吞吐量数据来源上轻松运行实时流处理。Flink 支持事件的事件时间语义、exactly-once 语义、背压控制，并针对编写流式和批处理应用程序 APIs 进行了优化。 out-of-order

此外，Flink 具有适用于第三方数据来源的连接器，例如以下内容：
+ [Amazon Kinesis Data Streams](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kinesis.html)
+ [Apache Kafka](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html)
+ [Flink Elasticsearch Connector](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html)
+ [Twitter Streaming API](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/twitter.html)
+ [Cassandra](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)

Amazon EMR 支持 Flink 作为 YARN 应用程序，因此您可以管理集群中的资源以及其他应用程序。 Flink-on-YARN允许您提交临时的 Flink 作业，或者您可以创建一个长时间运行的集群，该集群接受多个任务并根据整体 YARN 预留量分配资源。

Flink 包含在 Amazon EMR发行版 5.1.0 及更高版本中。

**注意**  
在 Amazon EMR 5.2.1 发行版本中增加了对 `FlinkKinesisConsumer` 类的支持。

下表列出了 Amazon EMR 7.x 系列的最新发行版附带的 Flink 的版本，以及 Amazon EMR 随 Flink 一起安装的组件。

有关此版本中与 Flink 一起安装的组件的版本，请参阅 [7.12.0 版本](emr-7120-release.md)的组件版本。


**emr-7.12.0 的 Flink 版本信息**  

| Amazon EMR 发行版标签 | Flink 版本 | 随 Flink 安装的组件 | 
| --- | --- | --- | 
| emr-7.12.0 | Flink 1.20.0-amzn-6 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-hdfs-zkfc, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 

下表列出了 Amazon EMR 6.x 系列的最新发行版附带的 Flink 的版本，以及 Amazon EMR 随 Flink 一起安装的组件。

有关此发行版中随 Flink 安装的组件版本，请参阅[发行版 6.15.0 组件版本](emr-6150-release.md)。


**emr-6.15.0 的 Flink 版本信息**  

| Amazon EMR 发行版标签 | Flink 版本 | 随 Flink 安装的组件 | 
| --- | --- | --- | 
| emr-6.15.0 | Flink 1.17.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 

下表列出了 Amazon EMR 5.x 系列的最新发行版附带的 Flink 的版本，以及 Amazon EMR 随 Flink 一起安装的组件。

有关此版本中随 Flink 安装的组件版本，请参阅[发行版 5.36.2 组件版本](emr-5362-release.md)。


**emr-5.36.2 的 Flink 版本信息**  

| Amazon EMR 发行版标签 | Flink 版本 | 随 Flink 安装的组件 | 
| --- | --- | --- | 
| emr-5.36.2 | Flink 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 

**Topics**
+ [使用 Flink 创建集群](flink-create-cluster.md)
+ [在 Amazon EMR 中配置 Flink](flink-configure.md)
+ [在 Amazon EMR 中使用 Flink 作业](flink-jobs.md)
+ [使用 Scala Shell](flink-scala.md)
+ [查找 Flink Web 界面](flink-web-interface.md)
+ [Flink Autoscaler](flink-autoscaler.md)
+ [优化任务恢复和扩展操作的作业重启时间](flink-restart.md)
+ [在 Amazon EMR 中通过 Zeppelin 使用 Flink 作业](flink-zeppelin.md)
+ [Flink 发布历史记录](Flink-release-history.md)

# 使用 Flink 创建集群
<a name="flink-create-cluster"></a>

您可以使用 AWS 管理控制台 AWS CLI、或 AWS SDK 启动集群。<a name="emr-flink-create-console"></a>

**使用控制台启动安装了 Flink 的集群**

1. [在 /emr 上打开亚马逊 EMR 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/emr/)

1. 依次选择 **Create cluster (创建集群)**、**Go to advanced options (转到高级选项)**。

1.  对于 **Software Configuration (软件配置)**，选择 **EMR Release emr-5.1.0 (EMR 版本 emr-5.1.0)** 或更高版本。

1.  选择 **Flink** 作为应用程序（与要安装的任何其它应用程序一起）。

1.  根据需要选择其它选项，然后选择 **Create cluster (创建集群)**。

**要使用 Flink 启动集群，请从 AWS CLI**
+ 使用下面的命令创建集群：

  ```
  aws emr create-cluster --release-label emr-7.12.0 \
  --applications Name=Flink \
  --region us-east-1 \
  --log-uri s3://myLogUri \
  --instance-type m5.xlarge \
  --instance-count 2 \
  --service-role EMR_DefaultRole_V2 \ 
  --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \
  --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
  Args=flink-yarn-session,-d
  ```
**注意**  
为了便于读取，包含 Linux 行继续符（\$1）。它们可以通过 Linux 命令删除或使用。对于 Windows，请将它们删除或替换为脱字号（^）。

# 在 Amazon EMR 中配置 Flink
<a name="flink-configure"></a>

## 使用 Hive 元存储和 Glue 目录配置 Flink
<a name="flink-configure-hive"></a>

亚马逊 EMR 版本 6.9.0 及更高版本支持 Hive Metastore 和 AWS Glue Catalog，并通过 Apache Flink 连接到 Hive。本部分概括介绍了使用 Flink 配置 [AWS Glue 目录](#flink-configure-hive-glue)和 [Hive 元存储](#flink-configure-hive-metastore)所需的步骤。

**Topics**
+ [使用 Hive 元存储](#flink-configure-hive-metastore)
+ [使用 AWS Glue 数据目录](#flink-configure-hive-glue)

### 使用 Hive 元存储
<a name="flink-configure-hive-metastore"></a>

1. 创建 EMR 集群，其中包含版本 6.9.0 或更高版本，并至少包含两个应用程序：**Hive** 和 **Flink**。

1. 使用[脚本运行程序](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)将以下脚本作为步骤函数执行：

   `hive-metastore-setup.sh`

   ```
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/hive.png)

### 使用 AWS Glue 数据目录
<a name="flink-configure-hive-glue"></a>

1. 创建 EMR 集群，其中包含版本 6.9.0 或更高版本，并至少包含两个应用程序：**Hive** 和 **Flink**。

1. 在 AWS Glue 数据目录设置中选择**用于 Hive 表元数据**，以在集群中启用数据目录。

1. 使用[脚本运行程序](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)并将以下脚本作为阶跃函数执行：[在 Amazon EMR 集群上运行命令和脚本](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)：

   glue-catalog-setup.sh 

   ```
   sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar 
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/hive.png)

## 使用配置文件配置 Flink
<a name="flink-configure-config"></a>

您可以使用 Amazon EMR 配置 API 通过配置文件配置 Flink。目前，可在 API 中配置的文件包括：
+ `flink-conf.yaml`
+ `log4j.properties`
+ `flink-log4j-session`
+ `log4j-cli.properties`

Flink 的主配置文件的名称为 `flink-conf.yaml`。

**要从中配置用于 Flink 的任务槽数量 AWS CLI**

1. 创建文件 `configurations.json` 并输入以下内容：

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "taskmanager.numberOfTaskSlots":"2"
         }
       }
   ]
   ```

1. 接下来，使用以下配置创建集群：

   ```
   aws emr create-cluster --release-label emr-7.12.0 \
   --applications Name=Flink \
   --configurations file://./configurations.json \
   --region us-east-1 \
   --log-uri s3://myLogUri \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

**注意**  
您也可以使用 Flink API 更改某些配置。有关更多信息，请参阅 Flink 文档中的[https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html)。  
对于 Amazon EMR 5.21.0 及更高版本，您可以覆盖集群配置，并为运行的集群中的每个实例组指定额外的配置分类。您可以使用 Amazon EMR 控制台、 AWS Command Line Interface (AWS CLI) 或软件开发工具包来完成此操作。 AWS 有关更多信息，请参阅[为运行的集群中的实例组提供配置](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-running-cluster.html)。

### 并行选项
<a name="flink-parallelism"></a>

作为应用程序所有者，您最了解应将哪些资源分配给 Flink 中的任务。对于本文档中的示例，请使用与您用于应用程序的任务实例相同的任务数量。通常，我们建议对初始并行级别执行此操作，但您也可以使用任务槽来增加并行粒度，它一般不应超过每实例[虚拟内核](https://aws.amazon.com/ec2/virtualcores/)数量。有关 Flink 架构的更多信息，请参阅 Flink 文档中的 [Concepts**](https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html)。

## 在包括多个主节点的 EMR 集群中配置 Flink
<a name="flink-multi-master"></a>

在具有多个主节点的 Amazon EMR 集群的主节点故障转移过程中，Flink 的 of 仍然可用。 JobManager 从 Amazon EMR 5.28.0 开始，还会自动 JobManager 启用高可用性。无需手动配置。

在 Amazon EMR 5.27.0 或更早版本中， JobManager 这是单点故障。 JobManager 失败时，它将丢失所有作业状态，并且不会恢复正在运行的作业。您可以通过配置应用程序尝试次数、检查点并启用 Flink 的状态存储来实现 JobManager ZooKeeper 高可用性，如以下示例所示：

```
[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.resourcemanager.am.max-attempts": "10"
    }
  },
  {
    "Classification": "flink-conf",
    "Properties": {
        "yarn.application-attempts": "10",
        "high-availability": "zookeeper",
        "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}",
        "high-availability.storageDir": "hdfs:///user/flink/recovery",
        "high-availability.zookeeper.path.root": "/flink"
    }
  }
]
```

您必须同时为 YARN 和 Flink 配置最大的应用程序主尝试次数。有关更多信息，请参阅 [YARN 集群高可用性的配置](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#maximum-application-master-attempts-yarn-sitexml)。您可能还需要配置 Flink 检查点以使重新启动的作业从之前完成的检查点 JobManager 恢复。有关更多信息，请参阅[开展 Flink 检查点检验](https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html)。

## 配置内存进程大小
<a name="flink-process-memory"></a>

对于使用 Flink 1.11.x 的 Amazon EMR 版本，您必须在中同时配置 () 和 JobManager (`jobmanager.memory.process.size`) 的内存进程总大小。 TaskManager `taskmanager.memory.process.size` `flink-conf.yaml`您可以通过使用配置 API 来配置集群或通过 SSH 手动取消这些字段来设置这些值。Flink 提供以下默认值。
+ `jobmanager.memory.process.size`：1600m
+ `taskmanager.memory.process.size`：1728m

要排除 JVM 元空间和开销，请使用 Flink 总内存大小 (`taskmanager.memory.flink.size`) 而非 `taskmanager.memory.process.size`。`taskmanager.memory.process.size` 的默认值为 1280m。不建议同时设置 `taskmanager.memory.process.size` 和 `taskmanager.memory.process.size`。

所有使用 Flink 1.12.0 及更高版本的 Amazon EMR 版本，都将 Flink 的开源设置中列出的默认值作为 Amazon EMR 上的默认值，因此您无需自行配置。

## 配置日志输出文件大小
<a name="flink-log-output"></a>

Flink 应用程序容器创建并写入三种类型的日志文件：`.out` 文件、`.log` 文件和 `.err` 文件。仅限将 `.err` 文件压缩并从文件系统中删除，而将 `.log` 和 `.out` 日志文件保留在文件系统中。为确保这些输出文件保持可管理以及集群保持稳定，您可以在 `log4j.properties` 设置文件的上限数量并限制其大小。

**Amazon EMR 版本 5.30.0 及更高版本**

从 Amazon EMR 5.30.0 开始，Flink 使用带有配置分类名称 `flink-log4j.` 的 log4j2 日志记录框架。以下示例配置演示 log4j2 格式。

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "appender.main.name": "MainAppender",
      "appender.main.type": "RollingFile",
      "appender.main.append" : "false",
      "appender.main.fileName" : "${sys:log.file}",
      "appender.main.filePattern" : "${sys:log.file}.%i",
      "appender.main.layout.type" : "PatternLayout",
      "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n",
      "appender.main.policies.type" : "Policies",
      "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy",
      "appender.main.policies.size.size" : "100MB",
      "appender.main.strategy.type" : "DefaultRolloverStrategy",
      "appender.main.strategy.max" : "10"
    },
  }
]
```

**Amazon EMR 版本 5.29.0 及较早版本**

对于 Amazon EMR 5.29.0 及更早版本，Flink 使用 log4j 日志记录框架。下面的示例配置演示了 log4j 格式。

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.appender.file": "org.apache.log4j.RollingFileAppender",
      "log4j.appender.file.append":"true",
      # keep up to 4 files and each file size is limited to 100MB
      "log4j.appender.file.MaxFileSize":"100MB",
      "log4j.appender.file.MaxBackupIndex":4,
      "log4j.appender.file.layout":"org.apache.log4j.PatternLayout",
      "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"
    },
  }
]
```

## 将 Flink 配置为使用 Java 11 运行
<a name="flink-configure-java11"></a>

Amazon EMR 6.12.0 及更高版本为 Flink 提供 Java 11 运行时系统支持。以下各节介绍如何配置集群以为 Flink 提供 Java 11 运行时系统支持。

**Topics**
+ [在创建集群时配置 Flink for Java 11](#flink-configure-java11-create)
+ [在正在运行的集群上配置 Flink for Java 11](#flink-configure-java11-update)
+ [在正在运行的集群上确认 Flink 的 Java 运行时系统](#flink-configure-java11-confirm)

### 在创建集群时配置 Flink for Java 11
<a name="flink-configure-java11-create"></a>

使用以下步骤创建包含 Flink 和 Java 11 运行时系统的 EMR 集群。添加 Java 11 运行时系统支持所在的配置文件是 `flink-conf.yaml`。

------
#### [ Console ]

**在控制台中创建包含 Flink 和 Java 11 运行时的集群**

1. [登录并通过/ AWS 管理控制台 emr 打开亚马逊 EMR 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/emr)

1. 在导航窗格中的 **EC2 上的 EMR** 下，选择**集群**，然后选择**创建集群**。

1. 选择 Amazon EMR 6.12.0 或更高版本，然后选择安装 Flink 应用程序。选择要在集群上安装的任何其他应用程序。

1. 继续设置您的集群。在可选的**软件设置**部分，使用默认的**输入配置**选项，并输入以下配置：

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. 继续设置并启动您的集群。

------
#### [ AWS CLI ]

**从 CLI 创建包含 Flink 和 Java 11 运行时系统的集群**

1. 创建一个将 Flink 配置为使用 Java 11 的配置文件 `configurations.json`。

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. 从中 AWS CLI，使用 Amazon EMR 6.12.0 或更高版本创建新 EMR 集群，然后安装 Flink 应用程序，如以下示例所示：

   ```
   aws emr create-cluster --release-label emr-6.12.0 \ 
   --applications Name=Flink \ 
   --configurations file://./configurations.json \ 
   --region us-east-1 \ 
   --log-uri s3://myLogUri \ 
   --instance-type m5.xlarge \ 
   --instance-count 2 \ 
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

------

### 在正在运行的集群上配置 Flink for Java 11
<a name="flink-configure-java11-update"></a>

使用以下步骤更新包含 Flink 和 Java 11 运行时系统的 EMR 集群。添加 Java 11 运行时系统支持所在的配置文件是 `flink-conf.yaml`。

------
#### [ Console ]

**在控制台中更新包含 Flink 和 Java 11 运行时系统的正在运行的集群**

1. [登录并通过/ AWS 管理控制台 emr 打开亚马逊 EMR 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/emr)

1. 在导航窗格中的 **EC2 上的 EMR** 下，选择**集群**，然后选择要更新的集群。
**注意**  
集群必须使用 Amazon EMR 6.12.0 或更高版本才能支持 Java 11。

1. 选择**配置**选项卡。

1. 在**实例组配置**部分，选择要更新的**正在运行**的实例组，然后从列表操作菜单中选择**重新配置**。

1. 使用**编辑属性**选项重新配置实例组，如下所示。在每个配置之后选择**添加新配置**。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/flink-configure.html)

1. 选择**保存更改**以添加配置。

------
#### [ AWS CLI ]

**从 CLI 中更新正在运行的集群，以使用 Flink 和 Java 11 运行时系统**

使用 `modify-instance-groups` 命令为运行的集群中的一个实例组指定新配置。

1. 首先，创建一个将 Flink 配置为使用 Java 11 的配置文件 `configurations.json`。在以下示例中，*ig-1xxxxxxx9*替换为您要重新配置的实例组的 ID。将文件保存在您将要运行 `modify-instance-groups` 命令的同一目录中。

   ```
   [
      {
         "InstanceGroupId":"ig-1xxxxxxx9",
         "Configurations":[
            {
               "Classification":"flink-conf",
               "Properties":{
                 "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "env.java.home":"/usr/lib/jvm/jre-11"
               },
               "Configurations":[]
            }
         ]
      }
   ]
   ```

1. 从中 AWS CLI，运行以下命令。替换您要重新配置的实例组的 ID：

   ```
   aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \
   --instance-groups file://configurations.json
   ```

------

### 在正在运行的集群上确认 Flink 的 Java 运行时系统
<a name="flink-configure-java11-confirm"></a>

要确定正在运行的集群的 Java 运行时系统，请使用 SSH 登录主节点，如 [Connect to the primary node with SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) 中所述。然后运行以下命令：

```
ps -ef | grep flink
```

包含 `-ef` 选项的 `ps` 命令列出了系统上所有正在运行的进程。您可以使用 `grep` 过滤该输出，以查找提及 `flink` 字符串的内容。查看 Java 运行时环境（JRE）值的输出 `jre-XX`。在以下输出中，`jre-11` 表示在运行时系统为 Flink 选择了 Java 11。

```
flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.
```

或者，[使用 SSH 登录主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)，然后使用命令 `flink-yarn-session -d` 启动 Flink YARN 会话。输出显示了 Flink 的 Java 虚拟机（JVM），如以下 `java-11-amazon-corretto` 示例所示：

```
2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64
```

# 在 Amazon EMR 中使用 Flink 作业
<a name="flink-jobs"></a>

在 Amazon EMR 上与 Flink 进行交互的方式有多种：通过控制台、跟踪界面上 ResourceManager 的 Flink 界面和命令行。您可通过以上任一方式将 JAR 文件提交到 Flink 应用程序。提交 JAR 文件后，它就会变成由 Flink JobManager 管理的作业。 JobManager 位于托管 Flink 会话 Application Master 守护程序的 YARN 节点上。

可以将 Flink 应用程序作为长时间运行集群或临时集群上的 YARN 作业。在长时间运行集群上，您可以将多个 Flink 作业提交给 Amazon EMR 上运行的一个 Flink 集群。如果您在临时集群上运行 Flink 作业，则 Amazon EMR 集群仅在其运行 Flink 应用程序的时间内存在，因此您只需为使用的资源和时间付费。您可以使用 Amazon `AddSteps` EMR API 操作、作为操作的步骤参数以及通过 AWS CLI `add-steps`或命令提交 Flink 作业。`RunJobFlow` `create-cluster`

## 启动 Flink YARN 应用程序，作为长时间运行集群上的步骤
<a name="flink-add-step"></a>

要启动 Flink 应用程序，使多个客户端能够通过 YARN API 操作向其提交工作，需要您创建集群或将 Flink 应用程序添加到现有集群中。有关如何创建新集群的说明，请参阅[使用 Flink 创建集群](flink-create-cluster.md)。要在现有集群上启动 YARN 会话，可从控制台、 AWS CLI或 Java SDK 使用以下步骤。

**注意**  
向 Amazon EMR 5.5.0 版本中添加了 `flink-yarn-session` 命令作为 `yarn-session.sh` 脚本的包装程序以简化执行。如果您使用 Amazon EMR 的更早版本，请将 `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` 在控制台中替换为 **Arguments (参数)** 或在 AWS CLI 命令中替换为 `Args`。

**使用控制台在现有集群上提交 Flink 作业**

使用 `flink-yarn-session` 命令在现有集群中提交 Flink 会话。

1. [在 /emr 上打开亚马逊 EMR 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/emr/)

1. 在集群列表中，选择先前已启动的集群。

1. 在集群详细信息页面上，选择 **Steps (步骤)**，再选择 **Add Step (添加步骤)**。

1. 使用随后提供的指南输入参数，然后选择**添加**。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/flink-jobs.html)

**要在现有集群上提交 Flink 作业，请使用 AWS CLI**
+ 使用 `add-steps` 命令将 Flink 任务添加到长时间运行的集群。以下示例命令指定 `Args="flink-yarn-session", "-d"` 在分离状态下 (`-d`) 在 YARN 集群中启动 Flink 会话。有关参数详细信息，请参阅新版 Flink 文档中的 [YARN 设置](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session)。

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## 将工作提交到长时间运行集群上的现有 Flink 应用程序
<a name="flink-submit-work"></a>

如果您在长时间运行的集群上已有 Flink 应用程序，则可以指定集群的 Flink 应用程序 ID，以便向其提交工作。要获取应用程序 ID，请在`yarn application -list` AWS CLI 或通过 [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API 操作运行：

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

此 Flink 会话的应用程序 ID 是`application_1473169569237_0002`，您可以使用它从 AWS CLI 或 SDK 向应用程序提交工作。

**Example SDK for Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## 提交临时 Flink 作业
<a name="flink-transient-job"></a>

以下示例启动一个临时集群，它运行 Flink 作业并在完成时将其终止。

**Example SDK for Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
使用 `create-cluster` 子命令创建一个临时集群，该集群在 Flink 作业完成时终止：  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```

# 使用 Scala Shell
<a name="flink-scala"></a>

适用于 EMR 集群的 Flink Scala Shell 仅配置为启动新的 YARN 会话。您可以通过以下过程使用 Scala Shell。

**在主节点上使用 Flink Scala Shell**

1. 使用 SSH 登录主节点，如 [Connect to the primary node with SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) 中所述。

1. 键入以下命令启动 Shell：

   在 Amazon EMR 版本 5.5.0 及更高版本中，您可以使用以下命令启动一个 Scala Shell 的 Yarn 集群。 TaskManager

   ```
   % flink-scala-shell yarn 1
   ```

   在 Amazon EMR 的更早版本中，使用：

   ```
   % /usr/lib/flink/bin/start-scala-shell.sh yarn 1
   ```

   这将启动 Flink Scala Shell，以便您能以交互方式使用 Flink。与使用其它接口和选项一样，您可以基于要从 Shell 运行的任务数来缩放示例中使用的 `-n` 选项值。

   有关更多信息，请参阅官方 Apache Flink 文档中的 [Scala REPL](https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/scala_shell.html)。

# 查找 Flink Web 界面
<a name="flink-web-interface"></a>

属于 Flink 应用程序的 Application Master 托管 Flink Web 界面。这是将 JAR 作为作业提交或查看其他作业当前状态的另一种方式。只要有 Flink 会话在运行，Flink Web 界面就处于活动状态。如果您有一个长时间运行的 YARN 任务已处于活动状态，则可以按照 *Amazon EMR 管理*指南中 [“使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)” 主题中的说明连接到 YARN。 ResourceManager例如，如果您设置了 SSH 隧道并在浏览器中激活了代理，则可以在 EMR 集群详细信息页面的 “**连接**” 下选择连接。 ResourceManager 

![\[Resource Manager link under Connections section in 集群 details page.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/resourcemanager.png)


找到后 ResourceManager，选择托管 Flink 会话的 YARN 应用程序。选择 **Tracking UI (跟踪 UI)** 列下的链接。

![\[Application details table showing a running Apache Flink session with ApplicationMaster link.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/resourcemanager2.png)


在 Flink Web 界面中，您可以查看配置，将自己的自定义 JAR 作为作业提交或监视正在进行的作业。

![\[Apache Flink Dashboard overview showing task managers, slots, and job statistics.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/flink.png)


# Flink Autoscaler
<a name="flink-autoscaler"></a>

## 概述
<a name="flink-autoscaler-overview"></a>

Amazon EMR 发行版 6.15.0 及更高版本支持 *Flink Autoscaler*。作业自动扩缩器功能从正在运行的 Flink 流式传输作业中收集指标，并自动扩展单个作业顶点。这可以降低反向压力并满足您设定的利用率目标。

有关更多信息，请参阅 *Apache Flink Kubernetes Operator* 文档中的 [Autoscaler](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/) 部分。

## 注意事项
<a name="flink-autoscaler-considerations"></a>
+ Amazon EMR 6.15.0 及更高版本支持 Flink Autoscaler。
+ Flink Autoscaler 仅支持流式传输作业。
+ 仅支持自适应计划程序。不支持默认计划程序。
+ 我们建议您启用集群扩展以允许动态资源预调配。首选 Amazon EMR 托管式自动扩缩功能，因为指标评估每 5-10 秒进行一次。在此间隔内，您的集群可以更轻松地适应所需集群资源的变化。

## 启用 Autoscaler
<a name="flink-autoscaler-start"></a>

创建 Amazon EMR on EC2 集群时，使用以下步骤启用 Flink Autoscaler。

1. 在 Amazon EMR 控制台中，创建新的 EMR 集群：

   1. 选择 Amazon EMR 发行版 `emr-6.15.0` 或更高版本。选择 **Flink** 应用程序捆绑包，然后选择您可能想要包含在集群中的任何其他应用程序。  
![\[Application bundle options for Amazon EMR集群, with Flink highlighted and selected.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/emr-flink-cluster-create.png)

   1. 在**集群扩展和预调配选项**下，选择**使用 EMR 托管扩展**。  
![\[集群 scaling options: manual, EMR-managed (selected), or custom automatic scaling.\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/emr-flink-cluster-managedscaling.png)

1. 在**软件设置**部分，输入以下配置以启用 Flink Autoscaler。对于测试场景，请将决策间隔、指标窗口间隔和稳定间隔设置为较低的值，以便作业立即做出扩展决策，从而更便于验证。

   ```
   [
     {
       "Classification": "flink-conf",
       "Properties": {
         "job.autoscaler.enabled": "true",
         "jobmanager.scheduler": "adaptive",
         "job.autoscaler.stabilization.interval": "60s",
         "job.autoscaler.metrics.window": "60s",
         "job.autoscaler.decision.interval": "10s",
         "job.autoscaler.debug.logs.interval": "60s"
       }
     }
   ]
   ```

1. 选择或配置您喜欢的任何其他设置，然后创建支持 Flink Autoscaler 的集群。

## Autoscaler 配置
<a name="flink-autoscaler-config"></a>

本部分涵盖了您可以根据自己的特定需求更改的大多数配置。

**注意**  
对于基于时间的配置（如 `time`、`interval` 和 `window`设置），如果未指定单位，则默认单位为毫秒。因此，不带后缀的值 `30` 等于 30 毫秒。对于其他时间单位，对于*秒*，包含适当的后缀 `s`，对于*分钟*，包含 `m`，或者对于*小时*，包含 `h`。

**Topics**
+ [循环配置](#flink-autoscaler-config-loop)
+ [指标和历史记录配置](#flink-autoscaler-config-metrics)
+ [顶点配置](#flink-autoscaler-config-vertex)
+ [积压作业配置](#flink-autoscaler-config-backlog)
+ [扩展操作配置](#flink-autoscaler-config-scale)

### Autoscaler 循环配置
<a name="flink-autoscaler-config-loop"></a>

Autoscaler 每隔几个可配置的时间间隔获取一次作业顶点级别指标，将其转换为扩展操作项，估计新的作业顶点并行度，并将其推荐给作业计划程序。只有在作业重启时间和集群稳定间隔之后才会收集指标。


| 配置密钥 | 默认 值 | 说明 | 示例值 | 
| --- | --- | --- | --- | 
| job.autoscaler.enabled | false | 在 Flink 集群上启用自动扩缩。 | true, false | 
| job.autoscaler.decision.interval | 60s | Autoscaler 决策时间间隔。 | 30（默认单位为毫秒）、5m、1h | 
| job.autoscaler.restart.time | 3m | 在操作员能够根据历史记录可靠地确定重启时间之前，将使用预期重启时间。 | 30（默认单位为毫秒）、5m、1h | 
| job.autoscaler.stabilization.interval | 300s | 不会执行新扩展的稳定期。 | 30（默认单位为毫秒）、5m、1h | 
| job.autoscaler.debug.logs.interval | 300s | Autoscaler 调试日志间隔。 | 30（默认单位为毫秒）、5m、1h | 

### 指标聚合和历史记录配置
<a name="flink-autoscaler-config-metrics"></a>

Autoscaler 获取指标，在基于时间的滑动窗口内聚合这些指标，然后对这些指标进行评估以做出扩展决策。利用每个作业顶点的扩展决策历史记录来估计新的并行度。它们既有基于年龄的到期时间，也有历史记录大小（至少 1）。


| 配置密钥 | 默认 值 | 说明 | 示例值 | 
| --- | --- | --- | --- | 
| job.autoscaler.metrics.window | 600s | Scaling metrics aggregation window size. | 30（默认单位为毫秒）、5m、1h | 
| job.autoscaler.history.max.count | 3 | 每个顶点要保留的过去的扩展决策的最大数量。 | 1 到 Integer.MAX\$1VALUE | 
| job.autoscaler.history.max.age | 24h | 每个顶点要保留的过去的扩展决策的最小数量。 | 30（默认单位为毫秒）、5m、1h | 

### 作业顶点级别配置
<a name="flink-autoscaler-config-vertex"></a>

每个作业顶点的并行度根据目标利用率进行修改，并受最小-最大并行度限制的局限。不建议将目标利用率设置为接近 100%（即，值 1），利用率界限可以作为缓冲区来处理中间负载波动。


| 配置密钥 | 默认 值 | 说明 | 示例值 | 
| --- | --- | --- | --- | 
| job.autoscaler.target.utilization | 0.7 | 目标顶点利用率。 | 0 - 1 | 
| job.autoscaler.target.utilization.boundary | 0.4 | 目标顶点利用率界限。如果当前处理速率在 [target\$1rate / (target\$1utilization - boundary) 内并且为 (target\$1rate / (target\$1utilization \$1 boundary)]，则不会执行扩展 | 0 - 1 | 
| job.autoscaler.vertex.min-parallelism | 1 | Autoscaler 可以使用的最小并行度。 | 0 - 200 | 
| job.autoscaler.vertex.max-parallelism | 200 | Autoscaler 可以使用的最大并行度。请注意，如果该限值高于 Flink 配置中配置的最大并行度或直接在每个 Operator 上设定的最大并行度，则该限值会被忽略。 | 0 - 200 | 

### 积压作业处理配置
<a name="flink-autoscaler-config-backlog"></a>

作业顶点需要额外的资源来处理在扩展操作期间积累的待处理事件或积压作业。这也称为 `catch-up` 持续时间。如果处理积压作业的时间超过配置的 `lag -threshold` 值，则作业顶点目标利用率将提高到最大级别。这有助于防止在积压作业处理过程中进行不必要的扩展操作。


| 配置密钥 | 默认 值 | 说明 | 示例值 | 
| --- | --- | --- | --- | 
| job.autoscaler.backlog-processing.lag-threshold | 5m | 延迟阈值可防止不必要的扩展，同时移除导致延迟的待处理消息。 | 30（默认单位为毫秒）、5m、1h | 
| job.autoscaler.catch-up.duration | 15m | 扩展操作后完全处理所有积压作业的目标持续时间。设置为 0 可禁用基于积压作业的扩展。 | 30（默认单位为毫秒）、5m、1h | 

### 扩展操作配置
<a name="flink-autoscaler-config-scale"></a>

在宽限期内，Autoscaler 不会在纵向扩展操作后立即执行缩减操作。这样可以防止由于临时负载波动而导致不必要的秤 up-down-up-down操作循环。

我们可以使用缩减操作比率来逐步降低并行度并释放资源，以应对临时的负载峰值。它还有助于防止在大规模缩减操作后进行不必要的轻微纵向扩展操作。

我们可以根据过去的作业顶点扩展决策历史记录来检测无效的纵向扩展操作，以防止进一步的并行度变化。


| 配置密钥 | 默认 值 | 说明 | 示例值 | 
| --- | --- | --- | --- | 
| job.autoscaler.scale-up.grace-period | 1h | 纵向扩展顶点后不允许缩减顶点的持续时间。 | 30（默认单位为毫秒）、5m、1h | 
| job.autoscaler.scale-down.max-factor | 0.6 | 最大缩减系数。值为 1 表示对缩减没有限制；0.6 意味着只能使用原始并行度的 60% 来缩减作业。 | 0 - 1 | 
| job.autoscaler.scale-up.max-factor | 100000. | 最大纵向扩展比。值为 2.0 意味着只能使用当前并行度的 200% 来纵向扩展作业。 | 0 - Integer.MAX\$1VALUE | 
| job.autoscaler.scaling.effectiveness.detection.enabled | false | 是否启用对无效扩展操作的检测，并允许 Autoscaler 阻止进一步的纵向扩展。 | true, false | 

# 优化任务恢复和扩展操作的作业重启时间
<a name="flink-restart"></a>

当任务失败或发生扩展操作时，Flink 会尝试从上一次完成的检查点重新执行任务。重启过程可能需要一分钟或更长时间才能执行，具体取决于检查点状态的大小以及并行任务的数量。重启期间，可以累积作业的积压任务。但是，Flink 可以通过一些方法来优化执行图的恢复和重启速度，从而提高作业稳定性。

本页介绍了 Amazon EMR Flink 可以在任务恢复或扩展操作期间缩短作业的重启时间的一些方法。

**Topics**
+ [任务本地恢复](#flink-restart-task-local)
+ [基于日志的通用增量检查点](#flink-restart-log-check)
+ [精细恢复](#flink-restart-fine-grained)
+ [自适应计划程序中的组合重启机制](#flink-restart-combined)

## 任务本地恢复
<a name="flink-restart-task-local"></a>

**注意**  
Amazon EMR 6.0.0 及更高版本都支持任务的本地恢复。

使用 Flink 检查点，每个任务都会生成其状态的快照，Flink 会将该快照写入分布式存储（如 Amazon S3）。在恢复的情况下，任务会从分布式存储中恢复其状态。分布式存储提供容错能力，并且可以在重新扩展期间重新分配状态，因为它可供所有节点访问。

但远程分布式存储也有一个缺点：所有任务都必须通过网络从远程位置读取其状态。在任务恢复或扩展操作期间，这可能会导致大规模状态的恢复时间很长。

通过*任务本地恢复*可以解决恢复时间长这一问题。任务将其在检查点上的状态写入任务本地的辅助存储（例如本地磁盘）。它们还将状态存储在主存储中，或者存储在 Amazon S3 中（在本例中）。恢复期间，计划程序将任务计划在任务之前运行所在的同一个任务管理器上，这样它们就可以从本地状态存储中恢复，而不是从远程状态存储中读取。有关更多信息，请参阅 *Apache Flink 文档*中的[任务本地恢复](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)。

我们对示例作业进行的基准测试表明，启用任务本地恢复后，恢复时间已从几分钟缩短到几秒。

要启用任务本地恢复，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 基于日志的通用增量检查点
<a name="flink-restart-log-check"></a>

**注意**  
Amazon EMR 6.10.0 及更高版本支持基于日志的通用增量检查点。

Flink 1.16 中添加了基于日志的通用增量检查点功能，以提高检查点的速度。较快的检查点间隔通常会导致恢复工作减少，因为恢复后需要重新处理的事件较少。有关更多信息，请参阅 *Apache Flink 博客*上的[使用基于日志的通用增量检查点提高检查点的速度和稳定性](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)。

对于示例作业，我们的基准测试表明，使用基于日志的通用增量检查点时，检查点时间从几分钟缩短到几秒。

要启用基于日志的通用增量检查点，请在 `flink-conf.yaml` 文件中设置以下配置。指定检查点间隔值，以毫秒为单位。

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 精细恢复
<a name="flink-restart-fine-grained"></a>

**注意**  
Amazon EMR 6.0.0 及更高版本提供默认计划程序的精细恢复支持。Amazon EMR 6.15.0 及更高版本提供自适应计划程序的精细恢复支持。

当任务在执行过程中失败时，Flink 会重置整个执行图，并从上次完成的检查点触发完整的重新执行。这比仅重新执行失败的任务更昂贵。精细恢复仅重新启动失败的任务与管道连接的组件。在以下示例中，作业图有 5 个顶点（`A` 到 `E`）。顶点之间的所有连接都使用逐点分布进行管道化处理，作业的 `parallelism.default` 设置为 `2`。

```
A → B → C → D → E
```

在本示例中，总共有 10 个任务在运行。第一个管道（`a1` 到 `e1`）在 TaskManager（`TM1`）上运行，第二个管道（`a2` 到 `e2`）在另一个 TaskManager（`TM2`）上运行。

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

有两个管道连接的组件：`a1 → e1` 和 `a2 → e2`。如果 `TM1` 或 `TM2` 其中一个失败，则故障仅影响 TaskManager 正在其中运行的管道中的 5 个任务。重启策略仅会启动受影响的管道化组件。

精细恢复仅适用于完全并行的 Flink 作业。`keyBy()` 或 `redistribute()` 操作不支持。有关更多信息，请参阅 *Flink 改进提案* Jira 项目中的 [FLIP-1：从任务失败中进行精细恢复](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)。

要启用精细恢复，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 自适应计划程序中的组合重启机制
<a name="flink-restart-combined"></a>

**注意**  
Amazon EMR 6.15.0 及更高版本支持自适应计划程序中的组合重启机制。

自适应计划程序可以根据可用插槽调整作业的并行度。如果没有足够的插槽来适应配置的作业并行度，它将自动降低并行度。如果有新的插槽可用，则任务将再次纵向扩展到配置的作业并行度。当没有足够的可用资源时，自适应计划程序将避免作业停机。这是 Flink Autoscaler 支持的计划程序。出于这些原因，我们建议在 Amazon EMR Flink 中使用自适应计划程序。但是，自适应计划程序可能会在短时间内进行多次重启，每添加一个新资源就会重启一次。这可能导致作业性能下降。

在 Amazon EMR 6.15.0 及更高版本中，Flink 在自适应计划程序中具有组合重启机制，可在添加第一个资源时打开一个重启窗口，然后等到配置的默认 1 分钟窗口间隔时。当有足够的资源可用来运行具有配置并行性的作业时，或者当间隔超时时，它会执行一次重启。

对于示例作业，我们的基准测试表明，当您使用自适应计划程序和 Flink Autoscaler 时，此功能处理的记录比默认行为多 10%。

要启用组合重启机制，请在 `flink-conf.yaml` 文件中设置以下配置。

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# 在 Amazon EMR 中通过 Zeppelin 使用 Flink 作业
<a name="flink-zeppelin"></a>

## 简介
<a name="flink-zeppelin-intro"></a>

Amazon EMR 发布了 6.10.0 及更高版本，支持与 Apache Flink 的 [Apache Zeppelin](emr-zeppelin.md) 集成。您可以通过 Zeppelin Notebooks 以交互方式提交 Flink 作业。使用 Flink 解释器，您可以执行 Flink 查询、定义 Flink 流媒体和批处理作业，以及在 Zeppelin Notebooks 可视化输出。Flink 解释器基于 Flink REST API 构建。这使您可以从 Zeppelin 环境中访问和操作 Flink 作业，以执行实时数据处理和分析。

Flink 解释器中有四个子解释器。它们的用途不同，但都在 JVM 中，与 Flink 共享相同的预配置入口点（`ExecutionEnviroment`、`StreamExecutionEnvironment`、`BatchTableEnvironment`、`StreamTableEnvironment`）。解释器如下：
+ `%flink`：创建 `ExecutionEnvironment`、`StreamExecutionEnvironment`、`BatchTableEnvironment`、`StreamTableEnvironment` 并提供 Scala 环境
+ `%flink.pyflink`：提供一个 Python 环境
+ `%flink.ssql`：提供流式 SQL 环境
+ `%flink.bsql`：提供批处理 SQL 环境

## 先决条件
<a name="flink-zeppelin-prerequisites"></a>
+ 使用 Amazon EMR 6.10.0 及更高版本创建的集群支持 Zeppelin 与 Flink 集成。
+ 要根据这些步骤的要求查看 EMR 集群上托管的 Web 界面，必须配置 SSH 隧道以允许入站访问。有关更多信息，请参阅 [Configure proxy settings to view websites hosted on the primary node](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-proxy.html)。

## 在 EMR 集群上配置 Zeppelin-Flink
<a name="flink-zeppelin-configure"></a>

使用以下步骤将 Apache Zeppelin 上的 Apache Flink 配置为在 EMR 集群上运行：

1. 从 Amazon EMR 控制台创建新集群。为 Amazon EMR 版本选择 emr-6.10.0 或更高版本。然后，选择使用“自定义”选项自定义您的应用程序捆绑包。在您的捆绑包中至少包含 Flink、Hadoop 和 Zeppelin。  
![\[在 Amazon EMR 控制台中，使用“自定义”选项自定义您的应用程序包。在您的捆绑包中至少包含 Flink、Hadoop 和 Zeppelin\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-console.png)

1. 使用您首选的设置创建集群的其余部分。

1. 一旦集群开始运行，在控制台中选择集群以查看其详细信息并打开“应用程序”选项卡。从“应用程序”用户界面部分选择“Zeppelin”，以打开 Zeppelin 网页界面。请确保您已设置了对 Zeppelin Web 界面的访问，包含连接到主节点的 SSH 隧道和代理连接，如 [先决条件](#flink-zeppelin-prerequisites) 中所述。  
![\[在 Zeppelin Web 界面上，您可以导入和创建新的 Notebooks。\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/welcome-to-zeppelin.png)

1. 现在，您可以使用 Flink 作为默认解释器在 Zeppelin Notebook 中创建新笔记。  
![\[您可以将 Flink 作为默认解释器在 Zeppelin Notebook 中创建新笔记。\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-create-notebook.png)

1. 请参阅以下代码示例，这些示例演示了如何从 Zeppelin Notebook 运行 Flink 作业。

## 在 EMR 集群上使用 Zeppelin-Flink 运行 Flink 作业
<a name="flink-zeppelin-run-jobs"></a>
+ 示例 1，Flink Scala

  a) Batch Ex WordCount ample (SCALA)

  ```
  %flink
  
  val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
               .map(w => (w, 1))
               .groupBy(0)
               .sum(1)
               .print()
  ```

  b) 直播 WordCount 示例 (SCALA)

  ```
  %flink
  
  val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
    .map(w => (w, 1))
    .keyBy(0)
    .sum(1)
    .print
  
  senv.execute()
  ```  
![\[例如，您可以从齐柏林飞艇笔记本上运行批处理 WordCount 和流式处理 WordCount作业。\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/streaming-wordcount-example.png)
+ 示例 2，Flink 流式传输 SQL

  ```
  %flink.ssql
  SET 'sql-client.execution.result-mode' = 'tableau';
  SET 'table.dml-sync' = 'true';
  SET 'execution.runtime-mode' = 'streaming';
  
  create table dummy_table (
    id int,
    data string
  ) with (
    'connector' = 'filesystem',
    'path' = 's3://s3-bucket/dummy_table',
    'format' = 'csv'
  );
  
  INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
  
  SELECT * FROM dummy_table;
  ```  
![\[此示例演示如何运行 Flink 流式传输 SQL 作业。\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/flink-streaming-sql.png)
+ 示例 3，Pyflink。请注意，您必须将名为 `word.txt` 的示例文本文件上传到 S3 存储桶。

  ```
  %flink.pyflink
  
  import argparse
  import logging
  import sys
  
  from pyflink.common import Row
  from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                             DataTypes, FormatDescriptor)
  from pyflink.table.expressions import lit, col
  from pyflink.table.udf import udtf
  
  def word_count(input_path, output_path):
      t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
      # write all the data to one file
      t_env.get_config().set("parallelism.default", "1")
  
      # define the source
      if input_path is not None:
          t_env.create_temporary_table(
              'source',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .build())
                             .option('path', input_path)
                             .format('csv')
                             .build())
          tab = t_env.from_path('source')
      else:
          print("Executing word_count example with default input data set.")
          print("Use --input to specify file input.")
          tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                    DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))
  
      # define the sink
      if output_path is not None:
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .option('path', output_path)
                             .format(FormatDescriptor.for_format('canal-json')
                                     .build())
                             .build())
      else:
          print("Printing result to stdout. Use --output to specify output path.")
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('print')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .build())
  
      @udtf(result_types=[DataTypes.STRING()])
      def split(line: Row):
          for s in line[0].split():
              yield Row(s)
  
      # compute word count
      tab.flat_map(split).alias('word') \
         .group_by(col('word')) \
         .select(col('word'), lit(1).count) \
         .execute_insert('sink') \
         .wait()
  
  
  logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  
  
  word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  ```

1. 在 Zeppelin 用户界面中选择 **FLINK 作业**即可访问和查看 Flink Web 用户界面。  
![\[Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/batch-wordcount-example.png)

1. 在浏览器的另一个选项卡中选择 **FLINK 作业**，会路由到 Flink Web 控制台。  
![\[在浏览器的另一个选项卡中选择 FLINK 作业，会打开 Flink Web 控制台。\]](http://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/images/flink-web-console.png)

# Flink 发布历史记录
<a name="Flink-release-history"></a>

下表列出了 Amazon EMR 每个发行版本中包含的 Flink 版本，以及随应用程序一起安装的组件。有关每个发行版本中的组件版本，请参阅 [Amazon EMR 7.x 发行版](emr-release-7x.md)、[Amazon EMR 6.x 发行版](emr-release-6x.md) 或 [Amazon EMR 5.x 发行版](emr-release-5x.md) 中的发行版“组件版本”部分。


**Flink 版本信息**  

| Amazon EMR 发行版标签 | Flink 版本 | 随 Flink 安装的组件 | 
| --- | --- | --- | 
| emr-7.12.0 | 1.20.0-amzn-6 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-hdfs-zkfc, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.11.0 | 1.20.0-amzn-5 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-hdfs-zkfc, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.10.0 | 1.20.0-amzn-4 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.9.0 | 1.20.0-amzn-3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.8.0 | 1.20.0-amzn-2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.7.0 | 1.20.0-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.6.0 | 1.20.0-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.5.0 | 1.19.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.4.0 | 1.19.1-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.3.0 | 1.18.1-amzn-2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.2.0 | 1.18.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-5.36.2 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-7.1.0 | 1.18.1-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.0.0 | 1.18.0-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.15.0 | 1.17.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.14.0 | 1.17.1-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.13.0 | 1.17.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.12.0 | 1.17.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.11.1 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.11.0 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.10.1 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.10.0 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.9.1 | 1.15.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.9.0 | 1.15.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.8.1 | 1.15.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.8.0 | 1.15.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.7.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-5.36.1 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.36.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.6.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-5.35.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.5.0 | 1.14.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.4.0 | 1.13.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.3.1 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.3.0 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.2.1 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.2.0 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.1.1 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-6.1.0 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.34.0 | 1.13.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.33.1 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.33.0 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.32.1 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.32.0 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.31.1 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.31.0 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.30.2 | 1.10.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.30.1 | 1.10.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.30.0 | 1.10.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.29.0 | 1.9.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.28.1 | 1.9.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.28.0 | 1.9.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.27.1 | 1.8.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.27.0 | 1.8.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.26.0 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.25.0 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.24.1 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.24.0 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.23.1 | 1.7.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.23.0 | 1.7.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.22.0 | 1.7.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.21.2 | 1.7.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.21.1 | 1.7.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.21.0 | 1.7.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.20.1 | 1.6.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.20.0 | 1.6.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.19.1 | 1.6.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.19.0 | 1.6.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.18.1 | 1.6.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.18.0 | 1.6.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.17.2 | 1.5.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.17.1 | 1.5.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.17.0 | 1.5.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.16.1 | 1.5.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.16.0 | 1.5.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.15.1 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.15.0 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.14.2 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.14.1 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.14.0 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.13.1 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.13.0 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.3 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.2 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.1 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.0 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.4 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.3 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.2 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.1 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.0 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.10.1 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.10.0 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.9.1 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.9.0 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.3 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.2 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.1 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.0 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.7.1 | 1.3.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.7.0 | 1.3.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.6.1 | 1.2.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.6.0 | 1.2.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.5.4 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.3 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.2 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.1 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.0 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.4.1 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.4.0 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.3.2 | flink-client | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.3.1 | flink-client | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.3.0 | flink-client | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.3 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.2 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.1 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.0 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.1.1 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.1.0 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 

# Flink 发布说明（按版本）
<a name="Flink-release-history-versions"></a>

有关完整的发布说明，请参阅以下部分。

# 亚马逊 EMR 7.10.0-Flink 发行说明
<a name="Flink-release-history-7100"></a>

**Amazon EMR 7.10.0 - Flink 更改**


| Type | 说明 | 
| --- | --- | 
|  新功能  |  从 Amazon EMR 版本 7.10.0 开始，您可以使用配置设置来更轻松地启用 Kafka 和 Kinesis Flink 连接器。在创建集群期间在 `flink-conf` 分类中添加 `kafka.enabled: true` 或 `kinesis.enabled: true`，以自动配置相应的连接器。这种简化的方法消除了以前需要的手动配置步骤。  | 

# 亚马逊 EMR 7.9.0-Flink 发行说明
<a name="Flink-release-history-790"></a>

**Amazon EMR 7.9.0 - Flink 更改**


| Type | 说明 | 
| --- | --- | 
|  新功能  |  从亚马逊 EMR 7.9.0 开始，Apache Flink out-of-the-box 支持 Avro、Parquet 和 ORC 文件格式。您可以直接将这些格式与任何 Flink API（DataStream、表或 SQL）一起使用，而无需进行任何其他配置。  | 
|  新功能  |  从 Amazon EMR 7.9.0 版本开始，您可以使用配置设置更轻松地启用 Hive 元数据仓或 AWS Glue 数据目录。在创建集群期间在 `flink-conf` 分类中添加 `hive.enabled: true` 或 `glue.enabled: true`，以自动配置相应的数据目录。这种简化的方法消除了以前需要的手动配置步骤。  | 

# 亚马逊 EMR 7.8.0-Flink 发行说明
<a name="Flink-release-history-780"></a>

**配置** — EMR Flink 开箱即用 S3A 在所有区域/分区中运行。 AWS 

# 亚马逊 EMR 7.7.0-Flink 发行说明
<a name="Flink-release-history-770"></a>
+ 可以使用符号链接到 `/usr/lib/flink/bin/sql-client.sh` 的命令 `flink-sql-client` 轻松调用 Flink SQL shell

# 亚马逊 EMR 7.6.0-Flink 发行说明
<a name="Flink-release-history-760"></a>

## Amazon EMR 7.6.0 - Flink 功能
<a name="Flink-release-history-760-features"></a>
+ 该版本没有变化。

# 亚马逊 EMR 7.5.0-Flink 发行说明
<a name="Flink-release-history-750"></a>


| Type | 说明 | 
| --- | --- | 
|  功能  |  增加了对使用远程 jar 运行 Flink 作业的支持。  | 
|  改进  |  使顶点排除和包含线程更安全。  | 

## Amazon EMR 7.5.0 - Flink 功能
<a name="Flink-release-history-750-features"></a>
+ 从 Amazon EMR 7.5.0 开始，在使用 `run` 和 `run-application` Apache Flink CLI 命令时，您可以将 Amazon S3 位置指定为 JAR 路径。当您提供 S3 路径时，EMR 会自动将 JAR 文件从 Amazon S3 下载到集群的 EBS 存储。每次指定同一个 JAR 文件时，EMR 都会从 Amazon S3 下载最新版本，而不是重用集群上现有的 JAR 文件。
+ 从 Amazon EMR 7.5.0 开始，客户可以使用 `run` 和 `run-application` Flink CLI 命令将远程路径（S3 位置）作为 JAR 路径传递。然后，JAR 文件会自动从 S3 存储拉取到集群的 EBS 存储中。如果再次提供相同的 JAR 文件，它会从 S3 下载最新的 JAR 文件，而不会重用集群上现有的 JAR 文件。

# 亚马逊 EMR 7.4.0-Flink 发行说明
<a name="Flink-release-history-740"></a>


| Type | 说明 | 
| --- | --- | 
|  升级  |  已将 Flink 升级到版本 1.19.1。  | 

# Amazon EMR 7.3.0：Flink 发布说明
<a name="Flink-release-history-730"></a>
+ 默认情况下，通过安全配置启用传输中加密的集群将使用 TLS 1.3 实现 Flink 进程、Job Manager REST 端点和 Flink Job History Server 之间的内部通信。

# Amazon EMR 7.2.0：Flink 发布说明
<a name="Flink-release-history-720"></a>


| Type | 说明 | 
| --- | --- | 
|  改进  |  支持通过配置 `kubernetes.service.labels` 为每个 Flink 作业的 Kubernetes 服务添加自定义标签。  | 