

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# In che modo Flink supporta l'elevata disponibilità e la resilienza del lavoro
<a name="jobruns-flink-resiliency"></a>

Le sezioni seguenti descrivono come Flink rende i lavori più affidabili e altamente disponibili. Lo fa attraverso funzionalità integrate come l'elevata disponibilità di Flink e varie funzionalità di ripristino in caso di guasti.

**Topics**
+ [Uso dell'alta disponibilità (high availability, HA) per operatori Flink e applicazioni Flink](jobruns-flink-using-ha.md)
+ [Ottimizzazione dei tempi di riavvio dei processi Flink per le operazioni di ripristino delle attività e dimensionamento con Amazon EMR su EKS](jobruns-flink-restart.md)
+ [Disattivazione graduale delle istanze spot con Flink su Amazon EMR su EKS](jobruns-flink-decommission.md)

# Uso dell'alta disponibilità (high availability, HA) per operatori Flink e applicazioni Flink
<a name="jobruns-flink-using-ha"></a>

Questo argomento mostra come configurare l'alta disponibilità e descrive come funziona per alcuni casi d'uso diversi. Questi includono quando utilizzi il Job manager e quando utilizzi i kubernetes nativi di Flink.

## Alta disponibilità dell'operatore Flink
<a name="jobruns-flink-ha-operator"></a>

Abilitiamo l'*alta disponibilità* per l'operatore Flink in modo da poter effettuare il failover su un operatore Flink in standby per ridurre al minimo i tempi di inattività nel circuito di controllo dell'operatore in caso di errori. L'alta disponibilità è abilitata per impostazione predefinita e il numero predefinito di repliche dell'operatore di avvio è 2. È possibile configurare il campo delle repliche nel file `values.yaml` per il grafico Helm.

I seguenti campi sono personalizzabili:
+ `replicas` (facoltativo, il valore predefinito è 2): l'impostazione di questo numero su un valore maggiore di 1 crea altri operatori in standby e consente un ripristino più rapido del processo.
+ `highAvailabilityEnabled` (facoltativo, l'impostazione predefinita è true): controlla se desideri abilitare l'HA. La specificazione di questo parametro come true abilita il supporto per l'implementazione multi-AZ e imposta i parametri`flink-conf.yaml` corretti.

Puoi disabilitare l'HA per il tuo operatore impostando la seguente configurazione nel file `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Implementazione multi-AZ**

Creiamo i pod dell'operatore in più zone di disponibilità. Si tratta di un vincolo leggero e i pod degli operatori verranno programmati nella stessa AZ se non disponi di risorse sufficienti in un'altra AZ.

**Determinazione della replica leader**

 Se HA è abilitato, le repliche utilizzano un lease per determinare quale dei due JMs è il leader e utilizzano un lease K8s per l'elezione del leader. È possibile descrivere il lease e consultare il campo .Spec.Holder Identity per determinare il leader attuale

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interazione Flink-S3**

**Configurazione delle credenziali di accesso**

Assicurati di aver configurato IRSA con le autorizzazioni IAM appropriate per accedere al bucket S3.

**Recupero dei jar dei processi dalla modalità di applicazione S3**

L'operatore Flink supporta anche il recupero dei jar delle applicazioni da S3. È sufficiente fornire la posizione S3 per il JARuri nelle specifiche. FlinkDeployment 

Puoi anche usare questa funzione per scaricare altri artefatti come gli script. PyFlink Lo script Python risultante viene inserito nel percorso `/opt/flink/usrlib/`.

L'esempio seguente mostra come utilizzare questa funzionalità per un lavoro. PyFlink Osserva i campi jarURI e args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Connettori S3 di Flink**

Flink viene fornito con due connettori S3 (elencati di seguito). Nelle sezioni seguenti viene spiegato quando utilizzare un determinato connettore.

**Creazione di checkpoint: connettore Presto S3**
+ Imposta lo schema S3 su s3p://
+ Il connettore consigliato da utilizzare per la creazione di checkpoint su s3. Per ulteriori informazioni, consulta [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) nella documentazione di Apache Flink.

Specificazione di esempio: FlinkDeployment 

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Lettura e scrittura su S3: connettore Hadoop S3**
+ Imposta lo schema S3 su `s3://` o (`s3a://`)
+ Il connettore consigliato per la lettura e la scrittura di file da S3 (unico connettore S3 a implementare l'[interfaccia FileSystem di Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ Per impostazione predefinita, abbiamo impostato `fs.s3a.aws.credentials.provider` il `flink-conf.yaml` file, che è. `com.amazonaws.auth.WebIdentityTokenCredentialsProvider` Se ignori completamente il valore `flink-conf` predefinito e stai interagendo con S3, assicurati di utilizzare questo provider.

 FlinkDeployment Specifiche di esempio

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

High Availability (HA) for Flink Deployments consente ai lavori di continuare a progredire anche se si verifica un errore temporaneo e si verificano arresti anomali. JobManager I processi verranno riavviati, ma dall'ultimo checkpoint riuscito, se l'HA è abilitata. Se l'HA non è abilitato, Kubernetes riavvierà il tuo lavoro JobManager, ma il tuo lavoro inizierà come un nuovo lavoro e perderà i progressi. Dopo aver configurato HA, possiamo dire a Kubernetes di archiviare i metadati HA in uno storage persistente a cui fare riferimento in caso di errore temporaneo JobManager e quindi riprendere i lavori dall'ultimo checkpoint riuscito.

L'HA è abilitata per impostazione predefinita per i processi Flink (il numero di repliche è impostato su 2, il che richiederà di fornire una posizione di archiviazione S3 per la persistenza dei metadati HA).

**Configurazioni HA**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Di seguito sono riportate le descrizioni delle configurazioni HA di cui sopra in Job Manager (definite in .spec.jobManager):
+ `highAvailabilityEnabled` (facoltativa, l'impostazione predefinita è true): impostala su `false ` se non desideri abilitare l'HA e non desideri utilizzare le configurazioni HA fornite. Puoi comunque manipolare il campo "repliche" per configurare manualmente l'HA.
+ `replicas`(facoltativo, l'impostazione predefinita è 2): l'impostazione di questo numero su un valore maggiore di 1 crea altri standby JobManagers e consente un ripristino più rapido del lavoro. Se disabiliti l'HA, devi impostare il numero di repliche su 1, altrimenti continuerai a ricevere errori di convalida (è supportata solo 1 replica se l'HA non è abilitata).
+ `storageDir` (obbligatoria): poiché per impostazione predefinita si utilizza 2 come numero di repliche, è necessario fornire una storageDir persistente. Attualmente questo campo accetta solo percorsi S3 come posizione di archiviazione.

**Località dei pod**

 Se abiliti HA, cerchiamo anche di collocare i pod nella stessa AZ, il che comporta un miglioramento delle prestazioni (latenza di rete ridotta grazie alla presenza di pod nella stessa). AZs Si tratta di una procedura basata sul migliore tentativo, il che significa che se non disponi di risorse sufficienti nella zona in cui è programmata la maggior parte dei tuoi pod, i pod rimanenti verranno comunque programmati, ma potrebbero finire su un nodo esterno a tale AZ.

**Determinazione della replica leader**

Se HA è abilitato, le repliche utilizzano un lease per determinare quale delle due JMs è la leader e utilizzano una K8s Configmap come archivio dati per archiviare questi metadati. Se vuoi determinare il leader, puoi verificare il contenuto della Configmap e la chiave `org.apache.flink.k8s.leader.restserver` sotto i dati per trovare il pod K8s con l'indirizzo IP. Inoltre, puoi utilizzare i seguenti comandi bash.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Processo Flink: Kubernetes nativo
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 e versioni successive supportano Kubernetes nativo di Flink per l'esecuzione di applicazioni Flink in modalità ad alta disponibilità su un cluster Amazon EKS. 

**Nota**  
Devi disporre di un bucket Amazon S3 creato per archiviare i metadati ad alta disponibilità del processo quando invii il processo Flink. Se non desideri utilizzare questa funzionalità, puoi disattivarla. È abilitata per impostazione predefinita.

Per attivare la funzionalità di alta disponibilità di Flink, fornisci i seguenti parametri Flink quando [esegui il comando CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). I parametri sono definiti sotto l'esempio.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**: il bucket Amazon S3 in cui desideri archiviare i metadati ad alta disponibilità per il tuo processo.

  **`Dkubernetes.jobmanager.replicas`**: Il numero di pod Job Manager da creare come numero intero maggiore di `1`.

  **`Dkubernetes.cluster-id`**: un ID univoco che identifica il cluster Flink.

# Ottimizzazione dei tempi di riavvio dei processi Flink per le operazioni di ripristino delle attività e dimensionamento con Amazon EMR su EKS
<a name="jobruns-flink-restart"></a>

Quando un'attività non riesce o quando si verifica un'operazione di dimensionamento, Flink tenta di rieseguire l'attività dall'ultimo checkpoint completato. L'esecuzione del processo di riavvio potrebbe richiedere un minuto o più, a seconda delle dimensioni dello stato del checkpoint e del numero di attività parallele. Durante il periodo di riavvio, le attività di backlog relative al processo possono accumularsi. Esistono tuttavia alcuni modi in cui Flink ottimizza la velocità di ripristino e riavvio dei grafici di esecuzione per migliorare la stabilità del processo.

Questa pagina descrive alcuni dei modi in cui Amazon EMR Flink può migliorare il tempo di riavvio del lavoro durante il ripristino delle attività o la scalabilità delle operazioni su istanze locali. Le istanze Spot sono capacità di elaborazione inutilizzata disponibile a un prezzo scontato. Ha comportamenti unici, tra cui interruzioni occasionali, quindi è importante capire come Amazon EMR su EKS li gestisce, incluso il modo in cui Amazon EMR on EKS esegue la disattivazione e il riavvio dei lavori.

**Topics**
+ [Ripristino locale delle attività](#flink-restart-task-local)
+ [Ripristino locale delle attività tramite montaggio su volumi Amazon EBS](#flink-restart-task-local-ebs)
+ [Checkpoint incrementale generico basato su log](#flink-restart-log-check)
+ [Ripristino granulare](#flink-restart-fine-grained)
+ [Meccanismo di riavvio combinato nel pianificatore adattivo](#flink-restart-combined)

## Ripristino locale delle attività
<a name="flink-restart-task-local"></a>

**Nota**  
Il ripristino locale delle attività è supportato con Flink su Amazon EMR su EKS 6.14.0 e versioni successive.

Con i checkpoint Flink, ogni attività produce un'istantanea del suo stato che Flink scrive in uno storage distribuito come Amazon S3. In caso di ripristino, le attività recuperano il loro stato dall'archiviazione distribuita. L'archiviazione distribuita offre tolleranza ai guasti e può ridistribuire lo stato durante il dimensionamento perché è accessibile a tutti i nodi.

Tuttavia, un archivio distribuito remoto presenta anche uno svantaggio: tutte le attività devono leggere il proprio stato da una posizione remota della rete. Ciò può comportare lunghi tempi di ripristino per stati di grandi dimensioni durante le operazioni di ripristino delle attività o di dimensionamento.

Il problema dei lunghi tempi di ripristino viene risolto mediante il *ripristino locale delle attività*. Le attività scrivono il loro stato su checkpoint in una memoria secondaria locale all'attività, ad esempio su un disco locale. Inoltre, memorizzano il loro stato nell'archiviazione principale, o su Amazon S3, come nel nostro caso. Durante il ripristino, lo scheduler pianifica le attività sullo stesso Task Manager in cui le attività erano state eseguite in precedenza, in modo che possano essere ripristinate dall'archivio di stato locale anziché essere lette dall'archivio di stato remoto. Per ulteriori informazioni, consulta l'argomento relativo al [ripristino locale delle attività](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) nella *documentazione di Apache Flink*.

I nostri test di benchmark con processi di esempio hanno dimostrato che il tempo di ripristino è stato ridotto da pochi minuti a pochi secondi con il ripristino locale delle attività abilitato.

Per abilitare il ripristino locale delle attività, imposta le seguenti configurazioni nel file `flink-conf.yaml`. Specifica il valore dell'intervallo di checkpoint in millisecondi.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Ripristino locale delle attività tramite montaggio su volumi Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**Nota**  
Il ripristino locale delle attività di Amazon EBS è supportato con Flink su Amazon EMR su EKS 6.15.0 e versioni successive.

Con Flink su Amazon EMR su EKS, puoi fornire automaticamente i volumi Amazon EBS ai pod TaskManager per il ripristino locale delle attività. Il montaggio di un overlay predefinito include un volume di 10 GB, sufficiente per processi con uno stato inferiore. I lavori con stati di grandi dimensioni possono abilitare l'opzione *montaggio automatico del volume EBS*. I pod TaskManager vengono creati e montati automaticamente durante la creazione dei pod e rimossi durante l'eliminazione dei pod.

Utilizza i seguenti passaggi per abilitare il montaggio automatico dei volumi EBS per Flink in Amazon EMR su EKS:

1. Esporta i valori per le seguenti variabili che utilizzerai nei passaggi successivi.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Crea o aggiorna un file YAML `kubeconfig` per il cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Crea un account di servizio IAM per il driver CSI (Container Storage Interface) di Amazon EBS sul cluster Amazon EKS. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Crea il driver CSI di Amazon EBS con il seguente comando:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Crea la classe di storage Amazon EBS con il seguente comando:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Quindi applica la classe:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm installa l'operatore Kubernetes di Amazon EMR Flink con opzioni per creare un account di servizio. Questa operazione crea il `emr-containers-sa-flink` da utilizzare nell'implementazione Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Per inviare il processo Flink e abilitare la fornitura automatica di volumi EBS per il ripristino locale delle attività, imposta le seguenti configurazioni nel file `flink-conf.yaml`. Regola il limite di dimensione in base alla dimensione dello stato del processo. Imposta `serviceAccount` su `emr-containers-sa-flink`. Specifica il valore dell'intervallo di checkpoint in millisecondi. Quindi, ometti `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Quando sei pronto per eliminare il plug-in del driver CSI di Amazon EBS, usa i seguenti comandi:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Checkpoint incrementale generico basato su log
<a name="flink-restart-log-check"></a>

**Nota**  
Il checkpoint incrementale generico basato su log è supportato con Flink su Amazon EMR su EKS 6.14.0 e versioni successive.

Il checkpoint incrementale generico basato su log è stato aggiunto in Flink 1.16 per migliorare la velocità dei checkpoint. Un intervallo di checkpoint più rapido spesso comporta una riduzione del lavoro di ripristino perché è necessario rielaborare un minor numero di eventi dopo il ripristino. Per ulteriori informazioni, consulta [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) sul *blog di Apache Flink*.

Con processi di esempio, i nostri test di benchmark hanno dimostrato che il tempo di checkpoint si è ridotto da pochi minuti a pochi secondi con il checkpoint incrementale generico basato su log.

Per abilitare i checkpoint incrementali generici basati su log, imposta le seguenti configurazioni nel tuo file `flink-conf.yaml`. Specifica il valore dell'intervallo di checkpoint in millisecondi.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Ripristino granulare
<a name="flink-restart-fine-grained"></a>

**Nota**  
Il supporto per il ripristino granulare nel pianificatore predefinito è supportato con Flink su Amazon EMR su EKS 6.14.0 e versioni successive. Il supporto per il ripristino granulare nel pianificatore adattivo è disponibile con Flink su Amazon EMR su EKS 6.15.0 e versioni successive.

Quando un'attività riporta un errore durante l'esecuzione, Flink reimposta l'intero grafico di esecuzione e attiva una riesecuzione completa dall'ultimo checkpoint completato. Questa procedura è più costosa della semplice riesecuzione delle attività non riuscite. Il ripristino granulare riavvia solo il componente connesso alla pipeline dell'attività non riuscita. Nell'esempio seguente, il grafico del processo ha 5 vertici (da `A` a `E`). Tutte le connessioni tra i vertici avvengono tramite pipeline con distribuzione uniforme e il comando `parallelism.default` per il processo è impostato su `2`. 

```
A → B → C → D → E
```

Per questo esempio, le attività totali in esecuzione sono 10. La prima pipeline (da `a1` a `e1`) viene eseguita su un TaskManager (`TM1`) e la seconda pipeline (da `a2` a `e2`) viene eseguita su un altro TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Esistono due componenti collegati tramite pipeline: `a1 → e1` e `a2 → e2`. Se `TM1` o `TM2` restituiscono un errore, l'errore influisce solo sulle 5 attività della pipeline in cui TaskManager era in esecuzione. La strategia di riavvio avvia solo il componente della pipeline interessato. 

Il ripristino granulare funziona solo con processi Flink perfettamente paralleli. Non è supportato con le operazioni `keyBy()` o `redistribute()`. Per ulteriori informazioni, consulta [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) nel progetto Jira *Flink Improvement Proposal*.

Per abilitare il ripristino granulare, imposta le seguenti configurazioni nel file `flink-conf.yaml`.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Meccanismo di riavvio combinato nel pianificatore adattivo
<a name="flink-restart-combined"></a>

**Nota**  
Il meccanismo di riavvio combinato nel pianificatore adattivo è supportato con Flink su Amazon EMR su EKS 6.15.0 e versioni successive.

Il pianificatore adattivo può regolare il parallelismo del processo in base agli slot disponibili. Riduce automaticamente il parallelismo se non sono disponibili abbastanza slot per soddisfare il parallelismo configurato del processo. Se diventano disponibili nuovi slot, il processo viene nuovamente dimensionato in base al parallelismo configurato del processo. Un pianificatore adattivo evita i tempi di inattività del processo quando le risorse disponibili non sono sufficienti. Questo è il pianificatore supportato per Autoscaler di Flink. Per questi motivi, con Flink di Amazon EMR consigliamo il pianificatore adattivo. Tuttavia, i pianificatori adattivi potrebbero eseguire più riavvii in un breve periodo di tempo, un riavvio per ogni nuova risorsa aggiunta. Questo potrebbe comportare un calo delle prestazioni nel processo.

Con Amazon EMR 6.15.0 e versioni successive, Flink dispone di un meccanismo di riavvio combinato nel pianificatore adattivo che apre una finestra di riavvio quando viene aggiunta la prima risorsa e quindi attende fino all'intervallo di finestra configurato di 1 minuto predefinito. Esegue un singolo riavvio quando sono disponibili risorse sufficienti per eseguire il processo con il parallelismo configurato o quando scade l'intervallo.

Con processi di esempio, i nostri test di benchmark hanno dimostrato che questa funzionalità elabora il 10% dei record in più rispetto al comportamento predefinito quando si utilizzano pianificatori adattivi e autoscaler di Flink.

Per abilitare il meccanismo di riavvio combinato, imposta le seguenti configurazioni nel file `flink-conf.yaml`.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Disattivazione graduale delle istanze spot con Flink su Amazon EMR su EKS
<a name="jobruns-flink-decommission"></a>

Flink con Amazon EMR su EKS può migliorare il tempo di riavvio del processo durante le operazioni di ripristino delle attività o dimensionamento.

## Panoramica di
<a name="jobruns-flink-decommission-overview"></a>

I rilasci 6.15.0 e successivi di Amazon EMR su EKS supportano la disattivazione graduale dei Task Manager sulle istanze spot in Amazon EMR su EKS con Apache Flink. Come parte di questa funzionalità, Amazon EMR su EKS con Flink offre le seguenti funzionalità:
+ **Just-in-time checkpoint: i job di** streaming Flink possono rispondere all'interruzione dell'istanza Spot, eseguire il checkpoint just-in-time (JIT) dei processi in esecuzione e impedire la pianificazione di attività aggiuntive su queste istanze Spot. Il checkpoint JIT è supportato con un pianificatore predefinito e adattivo.
+ **Meccanismo di riavvio combinato**: un meccanismo di riavvio combinato tenta di riavviare il processo dopo aver raggiunto il parallelismo delle risorse previsto o la fine della finestra configurata corrente. Ciò impedisce anche il riavvio consecutivo del processo che potrebbe essere causato da più terminazioni di istanze spot. Il meccanismo di riavvio combinato è disponibile solo con il pianificatore adattivo.

Queste funzionalità offrono i seguenti vantaggi:
+ Puoi sfruttare le istanze spot per eseguire i Task Manager e ridurre le spese del cluster.
+ Il miglioramento del riconoscimento di Task Manager per le istanze spot si traduce in una maggiore resilienza e in una pianificazione più efficiente dei processi.
+ I tuoi lavori Flink avranno più tempo di attività perché ci saranno meno riavvii dopo la terminazione dell'istanza spot.

## Come funziona lo smantellamento elegante
<a name="jobruns-flink-decommission-howitworks"></a>

Considera il seguente esempio: esegui il provisioning di un cluster Amazon EMR su EKS che esegue Apache Flink e specifichi nodi On-Demand per Job Manager e nodi di istanza spot per Task Manager. Due minuti prima della terminazione, Task Manager riceve un avviso di interruzione.

In questo scenario, il Job Manager gestisce il segnale di interruzione dell'istanza spot, blocca la programmazione di attività aggiuntive sull'istanza spot e avvia il checkpoint JIT per il processo in streaming.

Quindi, il Job Manager riavvia il grafico dei processi solo dopo che la disponibilità di nuove risorse è sufficiente a soddisfare il parallelismo del processo corrente nella finestra dell'intervallo di riavvio. L'intervallo della finestra di riavvio viene deciso in base alla durata della sostituzione dell'istanza spot, alla creazione di nuovi pod di Task Manager e alla registrazione con Job Manager.

## Prerequisiti
<a name="jobruns-flink-decommission-prereqs"></a>

Per utilizzare il decommissioning grazioso, crea ed esegui un processo di streaming su un cluster Amazon EMR su EKS che esegue Apache Flink. Abilita il pianificatore adattivo e Task Manager pianificati su almeno un'istanza spot, come mostrato nell'esempio seguente. Devi utilizzare i nodi On-Demand per Job Manager e puoi utilizzare i nodi On-Demand per i Task Manager purché sia presente almeno un'istanza spot.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configurazione
<a name="jobruns-flink-decommission-config"></a>

Questa sezione copre la maggior parte delle configurazioni che puoi specificare per le tue esigenze di disattivazione. 


| Chiave | Description | Valore predefinito | Valori accettabili | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Abilita la disattivazione graduale di Task Manager.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Abilita il meccanismo di riavvio combinato nel pianificatore adattivo.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  L'intervallo combinato della finestra di riavvio per eseguire riavvii combinati per il processo. Un numero intero senza unità viene interpretato come millisecondi.  |  1m  |  Esempi: 30, 60s, 3m, 1h  | 