Flink 2.2 升级的状态兼容性指南 - Managed Service for Apache Flink

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Flink 2.2 升级的状态兼容性指南

从 Flink 1.x 升级到 Flink 2.2 时,状态兼容性问题可能会使您的应用程序无法从快照中恢复。本指南可帮助您识别潜在的兼容性问题并提供迁移策略。

了解状态兼容性变化

适用于 Apache Flink 2.2 的亚马逊托管服务引入了几项影响状态兼容性的序列化更改。以下是主要的:

  • Kryo 版本升级:Apache Flink 2.2 将捆绑的 Kryo 序列化程序从版本 2 升级到版本 5。由于 Kryo v5 使用的二进制编码格式与 Kryo v2 不同,因此在 Flink 1.x 保存点中通过 Kryo 序列化的任何运算符状态都无法在 Flink 2.2 中恢复。

  • Java 集合序列化:在 Flink 1.x 中,其中的 Java 集合(例如HashMapArrayList、和HashSet)是使用 Kryo 进行序列化 POJOs 的。Flink 2.2 引入了特定于集合的优化序列化器,这些序列化器与 1.x 中的 Kryo 序列化状态不兼容。在 1.x 中使用带有 POJO 或 Kryo 序列化器的 Java 集合的应用程序无法在 Flink 2.2 中恢复此状态。有关数据类型和序列化的更多详细信息,请参阅 Flink 文档

  • Kinesis 连接器兼容性:低于 5.0 的 Kinesis Data Streams (KDS) 连接器版本保持与 Flink 2.2 Kinesis 连接器版本 6.0 不兼容的状态。升级之前,必须迁移到连接器版本 5.0 或更高版本。

序列化兼容性参考

查看应用程序中的所有状态声明,并将序列化类型与下表进行匹配。如果有任何状态类型不兼容,请在继续升级之前参阅州移民部分。

序列化兼容性参考
序列化类型 兼容? Details
Avro (SpecificRecord,GenericRecord) 使用自己独立于 Kryo 的二进制格式。确保你使用的是 Flink 的原生 Avro 类型信息,而不是注册为 Kryo 序列化器的 Avro。
Protobuf 使用自己独立于 Kryo 的二进制编码。验证架构更改是否遵循向后兼容的演变规则。
POJOs 没有收藏 由 Flink 的 POJO 序列化器处理——但前提是该类符合所有 POJO 标准:公共类、公共无参数构造函数、所有公共字段或可通过 getter/setter 访问的字段,以及所有字段类型本身均可由 Flink 序列化。违反其中任何一项的 POJO 都会默默地退回 Kryo 并变得不兼容。
自定义 TypeSerializers 仅当你的序列化器不在内部委托给 Kryo 时才兼容。
SQL 和表 API 状态 是(需要注意的是) 使用 Flink 的内部序列化器。但是,Apache Flink 不保证 Table API 应用程序的主要版本之间的状态兼容性。请先在非生产环境中测试。
POJOs 使用 Java 集合 (HashMapArrayListHashSet) 在 Flink 1.x 中,其中的集合通过 Kry POJOs o v2 进行序列化。Flink 2.2 引入了专用的集合序列化器,其二进制格式与 Kryo v2 格式不兼容。
Scala 案例课 在 Flink 1.x 中通过 Kryo 进行序列化。Kryo v2 到 v5 的升级更改了二进制格式。
Java 记录 通常在 Flink 1.x 中回退到 Kryo 序列化。通过测试进行验证disableGenericTypes()
第三方库类型 没有注册的自定义序列化程序的类型可以追溯到 Kryo。Kryo v2 到 v5 二进制格式的更改破坏了兼容性。
任何使用 Kryo 回退的类型 如果 Flink 无法使用内置或注册的序列化器处理类型,则会回退到 Kryo。1.x 中的所有 Kryo 序列化状态都与 2.2 不兼容。

诊断方法

您可以通过查看应用程序日志或在 UpdateApplication API 操作之后检查日志来主动识别状态兼容性问题。

在你的应用程序中识别 Kryo 的后备方案

你可以在日志中使用以下正则表达式模式来识别应用程序中的 Kryo fallback:

Class class (?<className>[^\s]+) cannot be used as a POJO type

日志示例:

Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

如果使用 UpdateApplication API 升级失败,则以下异常可能表明您遇到了基于序列化程序的状态不兼容问题:

IndexOutOfBoundsException

Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more

StateMigrationException (POJOSerializer)

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).

升级前清单

  • 查看申请中的所有州声明

  • 查看馆藏 (HashMap,ArrayList,HashSet) POJOs

  • 验证每种状态类型的序列化方法

  • 创建生产副本应用程序并在此副本上使用 UpdateApplication API 测试状态兼容性

  • 如果状态不兼容,请从中选择策略 州移民

  • 在生产 Flink 应用程序配置中启用自动回滚

州移民

重建完成状态

最适合可以从源数据重建状态的应用程序。

如果您的应用程序可以从源数据重建状态:

  1. 停止 Flink 1.x 应用程序

  2. 使用更新的代码升级到 Flink 2.x

  3. 从... 开始 SKIP_RESTORE_FROM_SNAPSHOT

  4. 允许应用程序重建状态

aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'

最佳实践

  1. 始终使用 Avro 或 Protobuf 处理复杂状态 — 它们提供架构演变并且与 Kryo 无关

  2. 避免在里面收藏 POJOs — 改用 Flink 的原ListState生版本 MapState

  3. 在@@ 本地测试状态恢复 — 在生产升级之前,使用实际快照进行测试

  4. 经常拍摄快照 — 尤其是在主要版本升级之前

  5. 启用自动回滚-将 MSF 应用程序配置为在出现故障时自动回滚

  6. 记录您的状态类型 — 维护所有状态类型及其序列化方法的文档

  7. 监视检查点大小 — 检查点大小增加可能表明存在序列化问题

后续步骤

计划升级:请参阅升级到 Flink 2.2:完整指南

有关迁移过程中的疑问或问题,请参阅Managed Service for Apache Flink 的故障排除或联系 Supp AWS ort。