本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Flink 2.2 升級的狀態相容性指南
從 Flink 1.x 升級到 Flink 2.2 時,狀態相容性問題可能會使您的應用程式無法從快照還原。本指南可協助您識別潛在的相容性問題,並提供遷移策略。
了解狀態相容性變更
Amazon Managed Service for Apache Flink 2.2 引進數個會影響狀態相容性的序列化變更。以下是主要項目:
-
Kryo 版本升級:Apache Flink 2.2 會將綁定的 Kryo 序列化程式從第 2 版升級到第 5 版。由於 Kryo v5 使用與 Kryo v2 不同的二進位編碼格式,因此無法在 Flink 2.2 中還原透過 Kryo 在 Flink 1.x 儲存點中序列化的任何運算子狀態。
-
Java 集合序列化:在 Flink 1.x 中,POJOs 內的 Java 集合 (例如
ArrayList、HashMap和HashSet) 使用 Kryo 序列化。Flink 2.2 推出與來自 1.x 的 Kryo 序列化狀態不相容的集合特定最佳化序列化程式。在 1.x 中使用 Java 集合搭配 POJO 或 Kryo 序列化程式的應用程式無法在 Flink 2.2 中還原此狀態。如需資料類型和序列化的詳細資訊,請參閱 Flink 文件。 -
Kinesis 連接器相容性:低於 5.0 的 Kinesis Data Streams (KDS) 連接器版本會維持與 Flink 2.2 Kinesis 連接器 6.0 版不相容的狀態。您必須先遷移至連接器 5.0 版或更新版本,才能升級。
序列化相容性參考
檢閱應用程式中的所有狀態宣告,並將序列化類型與下表比對。如果有任何狀態類型不相容,請先參閱狀態遷移一節,再繼續升級。
| 序列化類型 | 相容? | 詳細資訊 |
|---|---|---|
Avro (SpecificRecord、GenericRecord) |
是 | 使用自己的二進位格式,獨立於 Kryo。確保您使用的是 Flink 的原生 Avro 類型資訊,而不是註冊為 Kryo 序列化程式的 Avro。 |
| Protobuf | 是 | 使用自己的二進位編碼,獨立於 Kryo。驗證結構描述變更遵循回溯相容的演變規則。 |
| 沒有集合POJOs | 是 | 由 Flink 的 POJO 序列化程式處理 — 但僅限於類別符合所有 POJO 條件時:公有類別、公有無參數建構函數、所有公開或可透過 getter/setters 存取的欄位,以及所有欄位類型本身可透過 Flink 序列化。違反上述任何一項的 POJO 無提示地落回 Kryo,並變得不相容。 |
| 自訂 TypeSerializers | 是 | 只有在您的序列化程式未在內部委派給 Kryo 時才相容。 |
| SQL 和資料表 API 狀態 | 是 (具有警告) | 使用 Flink 的內部序列化程式。不過,Apache Flink 不保證資料表 API 應用程式的主要版本之間的狀態相容性。首先在非生產環境中進行測試。 |
具有 Java 集合的 POJOs (HashMap、ArrayList、HashSet) |
否 | 在 Flink 1.x 中,POJOs 內的集合是透過 Kryo v2 序列化。Flink 2.2 推出專用集合序列化程式,其二進位格式與 Kryo v2 格式不相容。 |
| Scala 案例類別 | 否 | 在 Flink 1.x 中透過 Kryo 序列化。Kryo v2 至 v5 升級會變更二進位格式。 |
| Java 記錄 | 否 | 通常回到 Flink 1.x 中的 Kryo 序列化。使用 進行測試來驗證 disableGenericTypes()。 |
| 第三方程式庫類型 | 否 | 沒有已註冊自訂序列化程式的類型會回到 Kryo。Kryo v2 到 v5 二進位格式變更會中斷相容性。 |
| 使用 Kryo 備用的任何類型 | 否 | 如果 Flink 無法處理具有內建或已註冊序列化程式的類型,則會回到 Kryo。來自 1.x 的所有 Kryo 序列化狀態與 2.2 不相容。 |
診斷方法
您可以在 UpdateApplication API 操作之後查看應用程式日誌或檢查日誌,以主動識別狀態相容性問題。
識別應用程式中的 Kryo 備用
您可以在日誌中使用下列 regex 模式來識別應用程式中的 Kryo 備用:
Class class (?<className>[^\s]+) cannot be used as a POJO type
範例日誌:
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
如果使用 UpdateApplication API 升級失敗,下列例外狀況可能表示您遇到序列化程式型狀態不相容:
IndexOutOfBoundsException
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more
StateMigrationException (POJOSerializer)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
升級前檢查清單
檢閱應用程式中的所有狀態宣告
使用集合 (
HashMap、ArrayList、HashSet) 檢查 POJOs驗證每種狀態類型的序列化方法
在此複本上使用 UpdateApplication API 建立生產複本應用程式並測試狀態相容性
如果狀態不相容,請從 選取策略 狀態遷移
在生產 Flink 應用程式組態中啟用自動轉返
狀態遷移
重建完成狀態
最適合可以從來源資料重建狀態的應用程式。
如果您的應用程式可以從來源資料重建狀態:
停止 Flink 1.x 應用程式
使用更新的程式碼升級至 Flink 2.x
從 開始
SKIP_RESTORE_FROM_SNAPSHOT允許應用程式重建狀態
aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'
最佳實務
一律將 Avro 或 Protobuf 用於複雜狀態 — 這些可提供結構描述演變,且與 Kryo 無關
避免 POJOs 中的集合 — 請
MapState改用 Flink 的原生ListState和 。在本機測試狀態還原 - 在生產升級之前,使用實際快照進行測試
經常拍攝快照 - 特別是在主要版本升級之前
啟用自動轉返 — 將您的 MSF 應用程式設定為在失敗時自動轉返
記錄您的狀態類型 — 維護所有狀態類型及其序列化方法的文件
監控檢查點大小 — 不斷增長的檢查點大小可能表示序列化問題
後續步驟
規劃升級:請參閱升級到 Flink 2.2:完成指南。
如需遷移期間的問題,請參閱 Managed Service for Apache Flink 故障診斷或聯絡 AWS Support。