Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2 - Servizio gestito per Apache Flink

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2

Durante l'aggiornamento da Flink 1.x a Flink 2.2, problemi di compatibilità dello stato possono impedire il ripristino dell'applicazione dalle istantanee. Questa guida aiuta a identificare potenziali problemi di compatibilità e fornisce strategie di migrazione.

Comprensione delle modifiche alla compatibilità degli stati

Amazon Managed Service for Apache Flink 2.2 introduce diverse modifiche alla serializzazione che influiscono sulla compatibilità degli stati. Le seguenti sono le principali:

  • Aggiornamento della versione Kryo: Apache Flink 2.2 aggiorna il serializzatore Kryo in bundle dalla versione 2 alla versione 5. Poiché Kryo v5 utilizza un formato di codifica binaria diverso da Kryo v2, qualsiasi stato dell'operatore serializzato tramite Kryo in un savepoint Flink 1.x non può essere ripristinato in Flink 2.2.

  • Serializzazione delle raccolte Java: in Flink 1.x, le raccolte Java (come, and) all'interno venivano serializzate utilizzando Kryo. HashMap ArrayList HashSet POJOs Flink 2.2 introduce serializzatori ottimizzati specifici per le raccolte che sono incompatibili con lo stato serializzato Kryo della versione 1.x. Le applicazioni che utilizzano raccolte Java con serializzatori POJO o Kryo in 1.x non possono ripristinare questo stato in Flink 2.2. Consultate la documentazione di Flink per maggiori dettagli sui tipi di dati e sulla serializzazione.

  • Compatibilità con Kinesis Connector: la versione del connettore Kinesis Data Streams (KDS) precedente alla 5.0 mantiene uno stato che non è compatibile con il connettore Kinesis Flink 2.2 versione 6.0. È necessario migrare alla versione 5.0 o successiva del connettore prima dell'aggiornamento.

Riferimento sulla compatibilità della serializzazione

Esamina tutte le dichiarazioni di stato nell'applicazione e abbina i tipi di serializzazione alla tabella seguente. Se un tipo di stato è incompatibile, consulta la Migrazione statale sezione prima di procedere con l'aggiornamento.

Riferimento sulla compatibilità della serializzazione
Tipo di serializzazione Compatibile? Informazioni
Avro (SpecificRecord,GenericRecord) Utilizza il proprio formato binario indipendente da Kryo. Assicurati di utilizzare le informazioni di tipo Avro native di Flink, non Avro registrato come serializzatore Kryo.
Protobug Utilizza la propria codifica binaria indipendente da Kryo. Verifica che le modifiche allo schema seguano le regole di evoluzione retrocompatibili.
POJOs senza collezioni Gestito dal serializzatore POJO di Flink, ma solo se la classe soddisfa tutti i criteri POJO: public class, public no-arg constructor, tutti i campi pubblici o accessibili tramite getters/setters e tutti i tipi di campo stessi serializzabili da Flink. Un POJO che viola uno di questi criteri ritorna silenziosamente a Kryo e diventa incompatibile.
Personalizzato TypeSerializers Compatibile solo se il serializzatore non delega a Kryo internamente.
Stato delle API SQL e Table Sì (con avvertenza) Utilizza i serializzatori interni di Flink. Tuttavia, Apache Flink non garantisce la compatibilità dello stato tra le versioni principali delle applicazioni Table API. Esegui prima il test in un ambiente non di produzione.
POJOs con collezioni Java (HashMap,ArrayList,HashSet) No In Flink 1.x, le raccolte interne POJOs sono state serializzate tramite Kryo v2. Flink 2.2 introduce serializzatori di raccolte dedicati il cui formato binario è incompatibile con il formato Kryo v2.
Classi di casi Scala No Serializzato tramite Kryo in Flink 1.x. L'aggiornamento da Kryo v2 a v5 modifica il formato binario.
Record Java No In genere si ricorre alla serializzazione Kryo in Flink 1.x. Verifica testando con. disableGenericTypes()
Tipi di librerie di terze parti No I tipi senza un serializzatore personalizzato registrato ricadono su Kryo. La modifica del formato binario da Kryo v2 a v5 interrompe la compatibilità.
Qualsiasi tipo che utilizza Kryo fallback No Se Flink non è in grado di gestire un tipo con un serializzatore integrato o registrato, torna a Kryo. Tutti gli stati serializzati con Kryo a partire dalla versione 1.x sono incompatibili con 2.2.

Metodi diagnostici

È possibile identificare i problemi di compatibilità degli stati in modo proattivo esaminando i log delle applicazioni o ispezionando i log dopo l'operazione dell'API. UpdateApplication

Identifica il fallback di Kryo nella tua applicazione

Puoi utilizzare il seguente schema regex nei tuoi log per identificare il fallback di Kryo nella tua applicazione:

Class class (?<className>[^\s]+) cannot be used as a POJO type

Registro di esempio:

Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

Se l'aggiornamento non riesce utilizzando l' UpdateApplication API, le seguenti eccezioni potrebbero segnalare un'incompatibilità di stato basata sul serializzatore:

IndexOutOfBoundsException

Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more

StateMigrationException (POJOSerializer)

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).

Lista di controllo prima dell'aggiornamento

  • Controlla tutte le dichiarazioni statali contenute nella tua domanda

  • Verifica POJOs con collections (HashMap,ArrayList,HashSet)

  • Verifica i metodi di serializzazione per ogni tipo di stato

  • Crea un'applicazione prod replica e verifica la compatibilità dello stato utilizzando l' UpdateApplication API su questa replica

  • Se lo stato è incompatibile, seleziona una strategia da Migrazione statale

  • Abilita il rollback automatico nella configurazione dell'applicazione Flink di produzione

Migrazione statale

Ricostruisci lo stato completo

Ideale per applicazioni in cui lo stato può essere ricostruito a partire dai dati di origine.

Se l'applicazione è in grado di ricostruire lo stato dai dati di origine:

  1. Arresta l'applicazione Flink 1.x

  2. Esegui l'aggiornamento a Flink 2.x con codice aggiornato

  3. Inizia con SKIP_RESTORE_FROM_SNAPSHOT

  4. Consenti all'applicazione di ricostruire lo stato

aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'

Best practice

  1. Usa sempre Avro o Protobuf per stati complessi: forniscono l'evoluzione dello schema e sono indipendenti da Kryo

  2. Evita le raccolte in POJOs: usa invece la versione nativa di Flink ListState MapState

  3. Prova il ripristino dello stato localmente: prima dell'aggiornamento della produzione, esegui il test con istantanee effettive

  4. Scatta istantanee frequentemente, soprattutto prima degli aggiornamenti delle versioni principali

  5. Abilita il rollback automatico: configura l'applicazione MSF per il rollback automatico in caso di errore

  6. Documenta i tipi di stato: conserva la documentazione di tutti i tipi di stato e dei relativi metodi di serializzazione

  7. Monitora le dimensioni dei checkpoint: l'aumento delle dimensioni dei checkpoint può indicare problemi di serializzazione

Fasi successive

Pianifica il tuo aggiornamento: Vedi. Aggiornamento a Flink 2.2: guida completa

Per domande o problemi durante la migrazione, consulta Risoluzione dei problemi relativi al servizio gestito per Apache Flink o contatta l' AWS assistenza.