本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Flink 2.2 升级的状态兼容性指南
从 Flink 1.x 升级到 Flink 2.2 时,状态兼容性问题可能会使您的应用程序无法从快照中恢复。本指南可帮助您识别潜在的兼容性问题并提供迁移策略。
了解状态兼容性变化
适用于 Apache Flink 2.2 的亚马逊托管服务引入了几项影响状态兼容性的序列化更改。以下是主要的:
-
Kryo 版本升级:Apache Flink 2.2 将捆绑的 Kryo 序列化程序从版本 2 升级到版本 5。由于 Kryo v5 使用的二进制编码格式与 Kryo v2 不同,因此在 Flink 1.x 保存点中通过 Kryo 序列化的任何运算符状态都无法在 Flink 2.2 中恢复。
-
Java 集合序列化:在 Flink 1.x 中,其中的 Java 集合(例如
HashMapArrayList、和HashSet)是使用 Kryo 进行序列化 POJOs 的。Flink 2.2 引入了特定于集合的优化序列化器,这些序列化器与 1.x 中的 Kryo 序列化状态不兼容。在 1.x 中使用带有 POJO 或 Kryo 序列化器的 Java 集合的应用程序无法在 Flink 2.2 中恢复此状态。有关数据类型和序列化的更多详细信息,请参阅 Flink 文档。 -
Kinesis 连接器兼容性:低于 5.0 的 Kinesis Data Streams (KDS) 连接器版本保持与 Flink 2.2 Kinesis 连接器版本 6.0 不兼容的状态。升级之前,必须迁移到连接器版本 5.0 或更高版本。
序列化兼容性参考
查看应用程序中的所有状态声明,并将序列化类型与下表进行匹配。如果有任何状态类型不兼容,请在继续升级之前参阅州移民部分。
| 序列化类型 | 兼容? | Details |
|---|---|---|
Avro (SpecificRecord,GenericRecord) |
是 | 使用自己独立于 Kryo 的二进制格式。确保你使用的是 Flink 的原生 Avro 类型信息,而不是注册为 Kryo 序列化器的 Avro。 |
| Protobuf | 是 | 使用自己独立于 Kryo 的二进制编码。验证架构更改是否遵循向后兼容的演变规则。 |
| POJOs 没有收藏 | 是 | 由 Flink 的 POJO 序列化器处理——但前提是该类符合所有 POJO 标准:公共类、公共无参数构造函数、所有公共字段或可通过 getter/setter 访问的字段,以及所有字段类型本身均可由 Flink 序列化。违反其中任何一项的 POJO 都会默默地退回 Kryo 并变得不兼容。 |
| 自定义 TypeSerializers | 是 | 仅当你的序列化器不在内部委托给 Kryo 时才兼容。 |
| SQL 和表 API 状态 | 是(需要注意的是) | 使用 Flink 的内部序列化器。但是,Apache Flink 不保证 Table API 应用程序的主要版本之间的状态兼容性。请先在非生产环境中测试。 |
POJOs 使用 Java 集合 (HashMap、ArrayList、HashSet) |
否 | 在 Flink 1.x 中,其中的集合通过 Kry POJOs o v2 进行序列化。Flink 2.2 引入了专用的集合序列化器,其二进制格式与 Kryo v2 格式不兼容。 |
| Scala 案例课 | 否 | 在 Flink 1.x 中通过 Kryo 进行序列化。Kryo v2 到 v5 的升级更改了二进制格式。 |
| Java 记录 | 否 | 通常在 Flink 1.x 中回退到 Kryo 序列化。通过测试进行验证disableGenericTypes()。 |
| 第三方库类型 | 否 | 没有注册的自定义序列化程序的类型可以追溯到 Kryo。Kryo v2 到 v5 二进制格式的更改破坏了兼容性。 |
| 任何使用 Kryo 回退的类型 | 否 | 如果 Flink 无法使用内置或注册的序列化器处理类型,则会回退到 Kryo。1.x 中的所有 Kryo 序列化状态都与 2.2 不兼容。 |
诊断方法
您可以通过查看应用程序日志或在 UpdateApplication API 操作之后检查日志来主动识别状态兼容性问题。
在你的应用程序中识别 Kryo 的后备方案
你可以在日志中使用以下正则表达式模式来识别应用程序中的 Kryo fallback:
Class class (?<className>[^\s]+) cannot be used as a POJO type
日志示例:
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
如果使用 UpdateApplication API 升级失败,则以下异常可能表明您遇到了基于序列化程序的状态不兼容问题:
IndexOutOfBoundsException
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more
StateMigrationException (POJOSerializer)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
升级前清单
查看申请中的所有州声明
查看馆藏 (
HashMap,ArrayList,HashSet) POJOs验证每种状态类型的序列化方法
创建生产副本应用程序并在此副本上使用 UpdateApplication API 测试状态兼容性
如果状态不兼容,请从中选择策略 州移民
在生产 Flink 应用程序配置中启用自动回滚
州移民
重建完成状态
最适合可以从源数据重建状态的应用程序。
如果您的应用程序可以从源数据重建状态:
停止 Flink 1.x 应用程序
使用更新的代码升级到 Flink 2.x
从... 开始
SKIP_RESTORE_FROM_SNAPSHOT允许应用程序重建状态
aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'
最佳实践
始终使用 Avro 或 Protobuf 处理复杂状态 — 它们提供架构演变并且与 Kryo 无关
避免在里面收藏 POJOs — 改用 Flink 的原
ListState生版本MapState在@@ 本地测试状态恢复 — 在生产升级之前,使用实际快照进行测试
经常拍摄快照 — 尤其是在主要版本升级之前
启用自动回滚-将 MSF 应用程序配置为在出现故障时自动回滚
记录您的状态类型 — 维护所有状态类型及其序列化方法的文档
监视检查点大小 — 检查点大小增加可能表明存在序列化问题
后续步骤
计划升级:请参阅升级到 Flink 2.2:完整指南。
有关迁移过程中的疑问或问题,请参阅Managed Service for Apache Flink 的故障排除或联系 Supp AWS ort。