Flink 2.2 アップグレードのステート互換性ガイド - Managed Service for Apache Flink

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Flink 2.2 アップグレードのステート互換性ガイド

Flink 1.x から Flink 2.2 にアップグレードする場合、状態互換性の問題により、アプリケーションがスナップショットから復元できなくなる可能性があります。このガイドは、潜在的な互換性の問題を特定し、移行戦略を提供するのに役立ちます。

状態の互換性の変更について

Amazon Managed Service for Apache Flink 2.2 では、状態の互換性に影響するいくつかのシリアル化の変更が導入されています。主なものは次のとおりです。

  • Kryo バージョンアップグレード: Apache Flink 2.2 は、バンドルされた Kryo シリアライザーをバージョン 2 からバージョン 5 にアップグレードします。Kryo v5 は Kryo v2 とは異なるバイナリエンコード形式を使用するため、Flink 1.x セーブポイントで Kryo を介してシリアル化された演算子の状態を Flink 2.2 で復元することはできません。

  • Java コレクションのシリアル化: Flink 1.x では、POJOs 内の Java コレクション (HashMapArrayList、 などHashSet) は Kryo を使用してシリアル化されました。Flink 2.2 では、1.x の Kryo シリアル化された状態と互換性のないコレクション固有の最適化されたシリアライザーが導入されています。1.x の POJO または Kryo シリアライザーで Java コレクションを使用するアプリケーションは、Flink 2.2 でこの状態を復元できません。データ型とシリアル化の詳細については、「Flink ドキュメント」を参照してください。

  • Kinesis Connector 互換性: 5.0 より前の Kinesis Data Streams (KDS) コネクタバージョンは、Flink 2.2 Kinesis コネクタバージョン 6.0 と互換性のない状態を維持します。アップグレードする前に、コネクタバージョン 5.0 以降に移行する必要があります。

シリアル化互換性リファレンス

アプリケーション内のすべての状態宣言を確認し、シリアル化タイプを以下の表に一致させます。いずれかの状態タイプに互換性がない場合は、アップグレードに進む前に状態移行「」セクションを参照してください。

シリアル化互換性リファレンス
シリアル化タイプ 互換性はありますか? 詳細
Avro (SpecificRecordGenericRecord) はい Kryo とは無関係に独自のバイナリ形式を使用します。Kryo シリアライザーとして登録されている Avro ではなく、Flink のネイティブ Avro タイプ情報を使用していることを確認します。
Protobuf はい Kryo とは無関係に独自のバイナリエンコーディングを使用します。スキーマの変更が下位互換性のある進化ルールに従っていることを確認します。
コレクションのない POJOs はい Flink の POJO シリアライザーによって処理されます。ただし、クラスがパブリッククラス、パブリック no-arg コンストラクタ、すべてのフィールドがパブリックであるか、getters/setters を介してアクセス可能であり、すべてのフィールドタイプが Flink によってシリアル化可能であるすべての POJO 基準を満たしている場合に限ります。これらのいずれかに違反する POJO は、無音で Kryo にフォールバックし、互換性がなくなります。
カスタム TypeSerializers はい シリアライザーが内部的に Kryo に委任されていない場合にのみ互換性があります。
SQL およびテーブル API の状態 はい (注意付き) Flink の内部シリアライザーを使用します。ただし、Apache Flink は Table API アプリケーションのメジャーバージョン間の状態互換性を保証しません。最初に本番稼働用環境以外でテストします。
Java コレクションを含む POJOs (HashMapArrayListHashSet) いいえ Flink 1.x では、POJOsは Kryo v2 を介してシリアル化されました。Flink 2.2 では、バイナリ形式が Kryo v2 形式と互換性のない専用のコレクションシリアライザーが導入されています。
Scala ケースクラス いいえ Flink 1.x の Kryo を介してシリアル化されます。Kryo v2 から v5 へのアップグレードにより、バイナリ形式が変更されます。
Java レコード いいえ 通常、Flink 1.x の Kryo シリアル化にフォールバックします。でテストして検証しますdisableGenericTypes()
サードパーティーのライブラリタイプ いいえ カスタムシリアライザーが登録されていないタイプは Kryo にフォールバックします。Kryo v2 から v5 へのバイナリ形式の変更により、互換性が損なわれます。
Kryo フォールバックを使用する任意のタイプ いいえ Flink が組み込みまたは登録されたシリアライザーを使用してタイプを処理できない場合、Flink は Kryo にフォールバックします。1.x からのすべての Kryo シリアル化状態は 2.2 と互換性がありません。

診断方法

UpdateApplication API オペレーション後にアプリケーションログを確認するか、ログを調べることで、状態互換性の問題を積極的に特定できます。

アプリケーションで Kryo フォールバックを特定する

ログで次の正規表現パターンを使用して、アプリケーションの Kryo フォールバックを特定できます。

Class class (?<className>[^\s]+) cannot be used as a POJO type

サンプルログ:

Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

UpdateApplication API を使用してアップグレードが失敗した場合、シリアライザーベースの状態の非互換性が発生していることを次の例外が通知することがあります。

IndexOutOfBoundsException

Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more

StateMigrationException (POJOSerializer)

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).

アップグレード前のチェックリスト

  • アプリケーション内のすべての状態宣言を確認する

  • コレクションを含む POJOs を確認する (HashMapArrayListHashSet)

  • 各状態タイプのシリアル化方法を検証する

  • このレプリカで UpdateApplication API を使用して、Prod レプリカアプリケーションを作成し、状態の互換性をテストする

  • 状態が互換性がない場合は、 から戦略を選択します。 状態移行

  • 本番稼働用 Flink アプリケーション設定で自動ロールバックを有効にする

状態移行

再構築完了状態

ソースデータから状態を再構築できるアプリケーションに最適です。

アプリケーションがソースデータから状態を再構築できる場合:

  1. Flink 1.x アプリケーションの停止

  2. 更新されたコードを使用して Flink 2.x にアップグレードする

  3. で開始する SKIP_RESTORE_FROM_SNAPSHOT

  4. アプリケーションに 状態の再構築を許可する

aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'

ベストプラクティス

  1. 複雑な状態では常に Avro または Protobuf を使用する — これらはスキーマの進化を提供し、Kryo に依存しません。

  2. POJOs — 代わりに Flink のネイティブ ListStateMapStateを使用します

  3. ローカルで状態復元をテストする — 本番稼働用アップグレードの前に、実際のスナップショットでテストします。

  4. スナップショットを頻繁に作成する — 特にメジャーバージョンのアップグレード前

  5. 自動ロールバックを有効にする — 障害時に自動的にロールバックするように MSF アプリケーションを設定する

  6. 状態タイプを文書化する — すべての状態タイプとそのシリアル化方法のドキュメントを維持します。

  7. チェックポイントサイズのモニタリング — チェックポイントサイズの増加はシリアル化の問題を示している可能性があります

次の手順

アップグレードを計画する: 「」を参照してくださいFlink 2.2 へのアップグレード: 完全なガイド

移行中の質問や問題については、「」を参照Managed Service for Apache Flink をトラブルシューティングするするか、 AWS サポートにお問い合わせください。