Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2
Durante l'aggiornamento da Flink 1.x a Flink 2.2, problemi di compatibilità dello stato possono impedire il ripristino dell'applicazione dalle istantanee. Questa guida aiuta a identificare potenziali problemi di compatibilità e fornisce strategie di migrazione.
Comprensione delle modifiche alla compatibilità degli stati
Amazon Managed Service for Apache Flink 2.2 introduce diverse modifiche alla serializzazione che influiscono sulla compatibilità degli stati. Le seguenti sono le principali:
-
Aggiornamento della versione Kryo: Apache Flink 2.2 aggiorna il serializzatore Kryo in bundle dalla versione 2 alla versione 5. Poiché Kryo v5 utilizza un formato di codifica binaria diverso da Kryo v2, qualsiasi stato dell'operatore serializzato tramite Kryo in un savepoint Flink 1.x non può essere ripristinato in Flink 2.2.
-
Serializzazione delle raccolte Java: in Flink 1.x, le raccolte Java (come, and) all'interno venivano serializzate utilizzando Kryo.
HashMapArrayListHashSetPOJOs Flink 2.2 introduce serializzatori ottimizzati specifici per le raccolte che sono incompatibili con lo stato serializzato Kryo della versione 1.x. Le applicazioni che utilizzano raccolte Java con serializzatori POJO o Kryo in 1.x non possono ripristinare questo stato in Flink 2.2. Consultate la documentazionedi Flink per maggiori dettagli sui tipi di dati e sulla serializzazione. -
Compatibilità con Kinesis Connector: la versione del connettore Kinesis Data Streams (KDS) precedente alla 5.0 mantiene uno stato che non è compatibile con il connettore Kinesis Flink 2.2 versione 6.0. È necessario migrare alla versione 5.0 o successiva del connettore prima dell'aggiornamento.
Riferimento sulla compatibilità della serializzazione
Esamina tutte le dichiarazioni di stato nell'applicazione e abbina i tipi di serializzazione alla tabella seguente. Se un tipo di stato è incompatibile, consulta la Migrazione statale sezione prima di procedere con l'aggiornamento.
| Tipo di serializzazione | Compatibile? | Informazioni |
|---|---|---|
Avro (SpecificRecord,GenericRecord) |
Sì | Utilizza il proprio formato binario indipendente da Kryo. Assicurati di utilizzare le informazioni di tipo Avro native di Flink, non Avro registrato come serializzatore Kryo. |
| Protobug | Sì | Utilizza la propria codifica binaria indipendente da Kryo. Verifica che le modifiche allo schema seguano le regole di evoluzione retrocompatibili. |
| POJOs senza collezioni | Sì | Gestito dal serializzatore POJO di Flink, ma solo se la classe soddisfa tutti i criteri POJO: public class, public no-arg constructor, tutti i campi pubblici o accessibili tramite getters/setters e tutti i tipi di campo stessi serializzabili da Flink. Un POJO che viola uno di questi criteri ritorna silenziosamente a Kryo e diventa incompatibile. |
| Personalizzato TypeSerializers | Sì | Compatibile solo se il serializzatore non delega a Kryo internamente. |
| Stato delle API SQL e Table | Sì (con avvertenza) | Utilizza i serializzatori interni di Flink. Tuttavia, Apache Flink non garantisce la compatibilità dello stato tra le versioni principali delle applicazioni Table API. Esegui prima il test in un ambiente non di produzione. |
POJOs con collezioni Java (HashMap,ArrayList,HashSet) |
No | In Flink 1.x, le raccolte interne POJOs sono state serializzate tramite Kryo v2. Flink 2.2 introduce serializzatori di raccolte dedicati il cui formato binario è incompatibile con il formato Kryo v2. |
| Classi di casi Scala | No | Serializzato tramite Kryo in Flink 1.x. L'aggiornamento da Kryo v2 a v5 modifica il formato binario. |
| Record Java | No | In genere si ricorre alla serializzazione Kryo in Flink 1.x. Verifica testando con. disableGenericTypes() |
| Tipi di librerie di terze parti | No | I tipi senza un serializzatore personalizzato registrato ricadono su Kryo. La modifica del formato binario da Kryo v2 a v5 interrompe la compatibilità. |
| Qualsiasi tipo che utilizza Kryo fallback | No | Se Flink non è in grado di gestire un tipo con un serializzatore integrato o registrato, torna a Kryo. Tutti gli stati serializzati con Kryo a partire dalla versione 1.x sono incompatibili con 2.2. |
Metodi diagnostici
Identifica il fallback di Kryo nella tua applicazione
Puoi utilizzare il seguente schema regex nei tuoi log per identificare il fallback di Kryo nella tua applicazione:
Class class (?<className>[^\s]+) cannot be used as a POJO type
Registro di esempio:
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
Se l'aggiornamento non riesce utilizzando l' UpdateApplication API, le seguenti eccezioni potrebbero segnalare un'incompatibilità di stato basata sul serializzatore:
IndexOutOfBoundsException
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more
StateMigrationException (POJOSerializer)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
Lista di controllo prima dell'aggiornamento
Controlla tutte le dichiarazioni statali contenute nella tua domanda
Verifica POJOs con collections (
HashMap,ArrayList,HashSet)Verifica i metodi di serializzazione per ogni tipo di stato
Crea un'applicazione prod replica e verifica la compatibilità dello stato utilizzando l' UpdateApplication API su questa replica
Se lo stato è incompatibile, seleziona una strategia da Migrazione statale
Abilita il rollback automatico nella configurazione dell'applicazione Flink di produzione
Migrazione statale
Ricostruisci lo stato completo
Ideale per applicazioni in cui lo stato può essere ricostruito a partire dai dati di origine.
Se l'applicazione è in grado di ricostruire lo stato dai dati di origine:
Arresta l'applicazione Flink 1.x
Esegui l'aggiornamento a Flink 2.x con codice aggiornato
Inizia con
SKIP_RESTORE_FROM_SNAPSHOTConsenti all'applicazione di ricostruire lo stato
aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'
Best practice
Usa sempre Avro o Protobuf per stati complessi: forniscono l'evoluzione dello schema e sono indipendenti da Kryo
Evita le raccolte in POJOs: usa invece la versione nativa di Flink
ListStateMapStateProva il ripristino dello stato localmente: prima dell'aggiornamento della produzione, esegui il test con istantanee effettive
Scatta istantanee frequentemente, soprattutto prima degli aggiornamenti delle versioni principali
Abilita il rollback automatico: configura l'applicazione MSF per il rollback automatico in caso di errore
Documenta i tipi di stato: conserva la documentazione di tutti i tipi di stato e dei relativi metodi di serializzazione
Monitora le dimensioni dei checkpoint: l'aumento delle dimensioni dei checkpoint può indicare problemi di serializzazione
Fasi successive
Pianifica il tuo aggiornamento: Vedi. Aggiornamento a Flink 2.2: guida completa
Per domande o problemi durante la migrazione, consulta Risoluzione dei problemi relativi al servizio gestito per Apache Flink o contatta l' AWS assistenza.