

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Comment Flink favorise la haute disponibilité et la résilience au travail
<a name="jobruns-flink-resiliency"></a>

Les sections suivantes expliquent comment Flink améliore la fiabilité et la disponibilité des offres d'emploi. Pour ce faire, il utilise des fonctionnalités intégrées telles que la haute disponibilité de Flink et diverses fonctionnalités de restauration en cas de panne.

**Topics**
+ [Utilisation de la haute disponibilité (HA) pour les opérateurs et les applications Flink](jobruns-flink-using-ha.md)
+ [Optimisation des temps de redémarrage des tâches Flink pour la récupération des tâches et la mise à l’échelle des opérations avec Amazon EMR sur EKS](jobruns-flink-restart.md)
+ [Mise hors service progressive des instances Spot avec Flink sur Amazon EMR sur EKS](jobruns-flink-decommission.md)

# Utilisation de la haute disponibilité (HA) pour les opérateurs et les applications Flink
<a name="jobruns-flink-using-ha"></a>

Cette rubrique explique comment configurer la haute disponibilité et décrit son fonctionnement pour différents cas d'utilisation. Cela inclut lorsque vous utilisez le Job Manager et lorsque vous utilisez les kubernetes natifs de Flink.

## Haute disponibilité de l’opérateur Flink
<a name="jobruns-flink-ha-operator"></a>

Nous activons la *haute disponibilité* de l’opérateur Flink afin de pouvoir basculer vers un opérateur Flink de secours et réduire au minimum les temps d’arrêt dans la boucle de contrôle de l’opérateur en cas de défaillance. La haute disponibilité est activée par défaut et le nombre initial par défaut de répliques d’opérateur est de 2. Vous pouvez configurer le champ des répliques dans votre fichier `values.yaml` pour les Charts de Helm.

Les champs suivants sont personnalisables :
+ `replicas` (facultatif, la valeur par défaut est 2) : si vous définissez ce nombre sur une valeur supérieure à 1, d'autres opérateurs de secours seront créés et vous pourrez reprendre votre tâche plus rapidement.
+ `highAvailabilityEnabled` (facultatif, la valeur par défaut est « true ») : permet d'indiquer si vous souhaitez activer la haute disponibilité (HA). Le fait de définir ce paramètre comme « true » permet de prendre en charge le déploiement multi-AZ et de définir les paramètres `flink-conf.yaml` corrects.

Vous pouvez désactiver la haute disponibilité pour votre opérateur en paramétrant la configuration ci-dessous dans votre fichier `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Déploiement multi-AZ**

Nous créons les pods des opérateurs dans plusieurs zones de disponibilité. Il s'agit d'une contrainte souple, et vos pods d'opérateur seront planifiés dans la même zone si vous ne disposez pas de suffisamment de ressources dans une autre zone.

**Détermination de la réplique leader**

 Si HA est activé, les répliques utilisent un bail pour déterminer lequel des deux JMs est le leader et utilisent un bail K8s pour l'élection du leader. Vous pouvez décrire le bail et consulter le champ .Spec.Holder Identity pour déterminer le leader actuel

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interaction Flink-S3**

**Configuration des informations d'identification d'accès**

Assurez-vous d'avoir configuré IRSA avec les autorisations IAM appropriées pour accéder au compartiment S3.

**Récupération des fichiers JAR des tâches à partir du mode d'application S3**

L'opérateur Flink prend également en charge la récupération des fichiers JAR des applications à partir de S3. Il vous suffit de fournir l'emplacement S3 du JArUri dans votre FlinkDeployment spécification.

Vous pouvez également utiliser cette fonctionnalité pour télécharger d'autres artefacts tels que PyFlink des scripts. Le script Python résultant est déposé sous le chemin `/opt/flink/usrlib/`.

L'exemple suivant montre comment utiliser cette fonctionnalité pour une PyFlink tâche. Notez les champs jarURI et args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Connecteurs S3 pour Flink**

Flink est livré avec deux connecteurs S3 (indiqués ci-dessous). Les sections suivantes expliquent quand utiliser quel connecteur.

**Point de contrôle : connecteur S3 pour Presto**
+ Définissez le schéma S3 sur s3p://
+ Le connecteur recommandé à utiliser pour le point de contrôle vers S3. Pour plus d'informations, consultez la section [spécifique à S3](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) dans la documentation d'Apache Flink.

Exemple de FlinkDeployment spécification :

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Lecture et écriture sur S3 : connecteur Hadoop S3**
+ Définissez le schéma S3 sur `s3://` ou (`s3a://`)
+ Le connecteur recommandé pour lire et écrire des fichiers à partir de S3 (uniquement le connecteur S3 qui implémente l'[interface Filesystem de Flinks](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ Par défaut, nous avons défini `fs.s3a.aws.credentials.provider` le `flink-conf.yaml` fichier, qui est`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Si vous remplacez complètement la `flink-conf` par défaut et que vous interagissez avec S3, assurez-vous d'utiliser ce fournisseur.

Exemple de FlinkDeployment spécification

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Gestionnaire de tâches Flink
<a name="jobruns-flink-ha-manager"></a>

La haute disponibilité (HA) pour les déploiements de Flink permet aux tâches de continuer à progresser même en cas d'erreur transitoire et de panne. JobManager Les tâches redémarreront, mais à partir du dernier point de contrôle validé lorsque la haute disponibilité est activée. Si la haute disponibilité n'est pas activée, Kubernetes redémarrera votre tâche JobManager, mais votre tâche redémarrera comme une nouvelle tâche et perdra sa progression. Après avoir configuré HA, nous pouvons demander à Kubernetes de stocker les métadonnées HA dans un stockage persistant afin de les référencer en cas de défaillance transitoire du, JobManager puis de reprendre nos tâches à partir du dernier point de contrôle réussi.

La haute disponibilité est activée par défaut pour vos tâches Flink (le nombre de répliques est défini sur 2, ce qui nécessite la mise à disposition d'un emplacement de stockage S3 pour l'enregistrement des métadonnées haute disponibilité).

**Configurations de haute disponibilité**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Voici les descriptions des configurations de haute disponibilité ci-dessus dans le gestionnaire de tâches (définies sous .spec.JobManager) :
+ `highAvailabilityEnabled` (facultatif, la valeur par défaut est « true ») : définissez ce paramètre sur `false ` si vous ne voulez pas activer la haute disponibilité et si vous ne souhaitez pas utiliser les configurations de haute disponibilité fournies. Vous pouvez toujours manipuler le champ « replicas » pour configurer manuellement la haute disponibilité.
+ `replicas`(facultatif, la valeur par défaut est 2) : Si vous définissez ce nombre sur une valeur supérieure à 1, vous créez un autre mode de veille JobManagers et vous pouvez reprendre votre travail plus rapidement. Si vous désactivez la haute disponibilité, vous devez définir le nombre de répliques sur 1, sinon vous continuerez à recevoir des erreurs de validation (une seule réplique est prise en charge si la haute disponibilité n'est pas activée).
+ `storageDir` (obligatoire) : étant donné que le nombre de répliques est égal à 2 par défaut, nous devons fournir un StorageDir persistant. Actuellement, ce champ n'accepte que les chemins S3 comme emplacement de stockage.

**Placement des pods**

 Si vous activez la haute disponibilité, nous essayons également de colocaliser les pods dans la même zone, ce qui améliore les performances (réduction de la latence du réseau grâce à la présence de pods dans les mêmes zones AZs). Ceci est un processus réalisé au mieux des possibilités, signifiant que si vous ne disposez pas de ressources suffisantes dans la zone de disponibilité où la majorité de vos pods sont planifiés, les pods restants seront tout de même planifiés, mais pourraient se retrouver sur un nœud situé en dehors de cette zone de disponibilité.

**Détermination de la réplique leader**

Si HA est activé, les répliques utilisent un bail pour déterminer lequel d'entre eux JMs est le leader et utilisent un K8s Configmap comme banque de données pour stocker ces métadonnées. Si vous souhaitez identifier le leader, vous pouvez consulter le contenu de la Configmap et la clé `org.apache.flink.k8s.leader.restserver` sous les données pour trouver le pod K8s correspondant à l’adresse IP indiquée. Vous pouvez également utiliser les commandes bash ci-dessous.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Tâche Flink – Native Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

À partir de la version 6.13.0, Amazon EMR prend en charge Flink Native Kubernetes pour l’exécution d’applications Flink en mode haute disponibilité sur un cluster Amazon EKS. 

**Note**  
Il est nécessaire de disposer d'un compartiment Amazon S3 préalablement créé pour conserver les métadonnées de haute disponibilité lorsque vous soumettez votre tâche Flink. Si vous ne souhaitez pas utiliser cette fonctionnalité, vous pouvez la désactiver. Elle est activée par défaut.

Pour activer la fonctionnalité haute disponibilité de Flink, spécifiez les paramètres Flink suivants lorsque vous [exécutez la commande `run-application` dans la CLI](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Les paramètres sont définis dans l’exemple ci-dessous.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** : compartiment Amazon S3 où vous souhaitez stocker les métadonnées de haute disponibilité pour votre tâche

  **`Dkubernetes.jobmanager.replicas`** : nombre de pods Job Manager à créer sous la forme d’un entier supérieur à `1`

  **`Dkubernetes.cluster-id`** : ID unique qui identifie le cluster Flink

# Optimisation des temps de redémarrage des tâches Flink pour la récupération des tâches et la mise à l’échelle des opérations avec Amazon EMR sur EKS
<a name="jobruns-flink-restart"></a>

Lorsqu’une tâche échoue ou qu’une opération de mise à l’échelle a lieu, Flink tente de réexécuter la tâche à partir du dernier point de contrôle terminé. L’exécution du processus de redémarrage peut durer une minute ou plus, en fonction de la taille de l’état du point de contrôle et du nombre de tâches parallèles. Pendant la période de redémarrage, les tâches en attente peuvent s’accumuler pour la tâche. Flink peut cependant permettre d’optimiser la vitesse de récupération et de redémarrage des graphes d’exécution afin d’améliorer la stabilité des tâches.

Cette page décrit certaines des manières dont Amazon EMR Flink peut améliorer le temps de redémarrage des tâches lors de la reprise des tâches ou des opérations de dimensionnement sur des instances ponctuelles. Les instances Spot sont des capacités de calcul inutilisées disponibles à prix discount. Ses comportements sont uniques, notamment des interruptions occasionnelles. Il est donc important de comprendre comment Amazon EMR on EKS les gère, notamment comment Amazon EMR on EKS procède à la mise hors service et au redémarrage des tâches.

**Topics**
+ [Récupération locale des tâches](#flink-restart-task-local)
+ [Récupération locale des tâches par montage de volume Amazon EBS](#flink-restart-task-local-ebs)
+ [Point de contrôle incrémentiel générique basé sur les journaux](#flink-restart-log-check)
+ [Récupération précise](#flink-restart-fine-grained)
+ [Mécanisme de redémarrage combiné dans le planificateur adaptatif](#flink-restart-combined)

## Récupération locale des tâches
<a name="flink-restart-task-local"></a>

**Note**  
La récupération locale des tâches est prise en charge par Flink sur Amazon EMR sur EKS 6.14.0 et versions ultérieures.

Avec les points de contrôle Flink, chaque tâche produit un instantané de son état que Flink écrit sur un stockage distribué comme Amazon S3. En cas de récupération, les tâches restaurent leur état à partir du stockage distribué. Le stockage distribué offre une tolérance aux pannes et peut redistribuer l’état lors de la mise à l’échelle, car tous les nœuds peuvent y accéder.

Cependant, un magasin distribué à distance présente également un inconvénient : toutes les tâches doivent lire leur état depuis un emplacement distant sur le réseau, ce qui peut entraîner l’augmentation du temps de récupération pour les états importants lors des opérations de récupération ou de mise à l’échelle de tâches.

La *récupération locale des tâches* permet de résoudre ce problème. Les tâches enregistrent leur état au point de contrôle sur un stockage secondaire local à la tâche, par exemple sur un disque local. Elles stockent également leur état sur le stockage principal, à savoir Amazon S3 dans notre cas. Lors de la récupération, le planificateur planifie les tâches sur le même Task Manager que celui dans lequel les tâches ont été exécutées précédemment afin qu’elles puissent être récupérées depuis le magasin d’états local au lieu de lire depuis le magasin d’état distant. Pour plus d’informations, voir la rubrique [Récupération locale des tâches](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) de la *documentation Apache Flink*.

Nos tests d’évaluation avec des exemples de tâches ont montré que le temps de récupération était passé de quelques minutes à quelques secondes grâce à l’activation de la récupération locale des tâches.

Pour activer la récupération locale des tâches, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Récupération locale des tâches par montage de volume Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**Note**  
La récupération locale des tâches par Amazon EBS est prise en charge par Flink sur Amazon EMR sur EKS 6.15.0 et versions ultérieures.

Avec Flink sur Amazon EMR sur EKS, vous pouvez automatiquement provisionner des volumes Amazon EBS sur les pods TaskManager pour la restauration locale des tâches. Le montage de superposition par défaut comprend un volume de 10 Go, ce qui est suffisant pour les tâches dont l’état est moins important. Les tâches dont les états sont importants peuvent activer l’option de *montage automatique de volume EBS*. Les pods TaskManager sont automatiquement créés et montés lors de la création du pod et supprimés lors de sa suppression.

Procédez comme suit pour activer le montage automatique de volume EBS pour Flink dans Amazon EMR sur EKS :

1. Exportez les valeurs des variables suivantes que vous utiliserez lors des étapes suivantes.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Créez ou mettez à jour un fichier YAML `kubeconfig` pour votre cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Créez un compte de service IAM pour le pilote CSI Amazon EBS sur votre cluster Amazon EKS. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Créez le pilote CSI Amazon EBS à l’aide de la commande suivante :

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Créez la classe de stockage Amazon EBS à l’aide de la commande suivante :

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Appliquez ensuite la classe :

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm installe l’opérateur Kubernetes pour Flink sur Amazon EMR avec des options permettant de créer un compte de service. Cela crée le `emr-containers-sa-flink` à utiliser dans le déploiement de Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Pour soumettre la tâche Flink et activer le provisionnement automatique des volumes EBS pour la récupération locale des tâches, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`. Ajustez la limite de taille d’état pour la tâche. Définissez `serviceAccount` sur `emr-containers-sa-flink`. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes. Et omettez le `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Lorsque vous êtes prêt à supprimer le plug-in de pilote CSI Amazon EBS, utilisez les commandes suivantes :

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Point de contrôle incrémentiel générique basé sur les journaux
<a name="flink-restart-log-check"></a>

**Note**  
Les points de contrôle incrémentiels génériques basés sur les journaux sont pris en charge avec Flink sur Amazon EMR sur EKS 6.14.0 et versions ultérieures.

Les points de contrôle incrémentiels génériques basés sur les journaux ont été ajoutés à Flink 1.16 pour rendre les points de contrôle plus fréquents. Un intervalle de point de contrôle plus court entraîne souvent une réduction du travail de récupération, car moins d’événements doivent être traités de nouveau après la récupération. Pour plus d’informations, accédez à la page [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) sur le *blog Apache Flink*.

Sur base de quelques exemples de tâches, nos tests d’évaluation ont montré que le temps de contrôle était passé de quelques minutes à quelques secondes grâce au point de contrôle incrémentiel générique basé sur les journaux.

Pour activer les points de contrôle incrémentiels génériques basés sur les journaux, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Récupération précise
<a name="flink-restart-fine-grained"></a>

**Note**  
La récupération précise du planificateur par défaut est prise en charge avec Flink sur Amazon EMR sur EKS 6.14.0 et versions ultérieures. La récupération précise du planificateur adaptatif est prise en charge avec Flink sur Amazon EMR sur EKS 6.15.0 et versions ultérieures.

Lorsqu’une tâche échoue pendant son exécution, Flink réinitialise l’intégralité du graphe d’exécution et déclenche une réexécution complète depuis le dernier point de contrôle terminé. Cette opération est plus chère qu’une simple réexécution des tâches qui ont échoué. La récupération précise redémarre uniquement le composant connecté au pipeline de la tâche ayant échoué. Dans l’exemple suivant, le graphe de tâches présente 5 sommets (`A` à `E`). Toutes les connexions entre les sommets sont en pipeline avec une distribution ponctuelle, et la valeur de `parallelism.default` pour la tâche est définie sur `2`. 

```
A → B → C → D → E
```

Dans cet exemple, 10 tâches sont en cours d’exécution au total. Le premier pipeline (`a1` à `e1`) s’exécute sur un TaskManager (`TM1`), et le deuxième pipeline (`a2` à `e2`) s’exécute sur un autre TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Deux composants sont connectés en pipeline : `a1 → e1` et `a2 → e2`. En cas d’échec de `TM1` ou de `TM2`, l’échec affecte uniquement les 5 tâches du pipeline dans lequel TaskManager était en cours d’exécution. La stratégie de redémarrage démarre uniquement le composant en pipeline concerné. 

La récupération précise ne fonctionne qu’avec des tâches Flink parfaitement parallèles. Elle n’est pas prise en charge par `keyBy()` ou par les opérations `redistribute()`. Pour plus d’informations, accédez à la page [FLIP-1 : Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) du projet Jira *Flink Improvement Proposal*.

Pour activer la récupération précise, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Mécanisme de redémarrage combiné dans le planificateur adaptatif
<a name="flink-restart-combined"></a>

**Note**  
Le mécanisme de redémarrage combiné du planificateur adaptatif est pris en charge avec Flink sur Amazon EMR 6.15.0 et versions ultérieures.

Le planificateur adaptatif peut ajuster le parallélisme de la tâche en fonction des emplacements disponibles. Si le nombre d’emplacements disponibles est insuffisant, le planificateur réduit automatiquement le parallélisme pour s’adapter au parallélisme des tâches configuré. Si de nouveaux emplacements sont disponibles, la tâche fait l’objet d’une augmentation d’échelle selon le parallélisme des tâches configuré. Un planificateur adaptatif permet d’éviter les temps d’arrêt de la tâche lorsque les ressources disponibles sont insuffisantes. Il s’agit du planificateur pris en charge par l’outil de mise à l’échelle automatique Flink. Nous recommandons donc l’utilisation d’un planificateur adaptatif avec Amazon EMR Flink. Les planificateurs adaptatifs peuvent toutefois effectuer plusieurs redémarrages en peu de temps, à raison d’un redémarrage pour chaque nouvelle ressource ajoutée, ce qui peut entraîner une baisse des performances de la tâche.

Avec Amazon EMR 6.15.0 et versions ultérieures, Flink dispose d’un mécanisme de redémarrage combiné dans le planificateur adaptatif qui ouvre une fenêtre de redémarrage lorsque la première ressource est ajoutée, puis attend l’intervalle de fenêtre configuré de 1 minute par défaut. Un seul redémarrage est effectué lorsque les ressources disponibles sont suffisantes pour exécuter la tâche avec un parallélisme configuré ou lorsque l’intervalle expire.

Grâce à quelques exemples de tâches, nos tests d’évaluation ont montré que cette fonctionnalité traite 10 % d’enregistrements supplémentaires par rapport au comportement par défaut lorsque vous utilisez le planificateur adaptatif et l’outil de mise à l’échelle automatique Flink.

Pour activer le mécanisme de redémarrage combiné, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Mise hors service progressive des instances Spot avec Flink sur Amazon EMR sur EKS
<a name="jobruns-flink-decommission"></a>

L’utilisation de Flink avec Amazon EMR sur EKS peut améliorer le temps de redémarrage des tâches lors des opérations de récupération ou de mise à l’échelle des tâches.

## Présentation de
<a name="jobruns-flink-decommission-overview"></a>

La version 6.15.0 et les versions ultérieures d’Amazon EMR prennent en charge la mise hors service progressive des Task Managers sur les instances Spot dans Amazon EMR sur EKS avec Apache Flink. Dans le cadre de cette fonctionnalité, Amazon EMR sur EKS avec Flink fournit les fonctionnalités suivantes :
+ **Just-in-time pointage de contrôle** — Les tâches de streaming Flink peuvent répondre à une interruption d'une instance Spot, effectuer un point de contrôle just-in-time (JIT) des tâches en cours d'exécution et empêcher la planification de tâches supplémentaires sur ces instances Spot. Les points de contrôle juste à temps sont pris en charge avec le planificateur par défaut et le planificateur adaptatif.
+ **Mécanisme de redémarrage combiné** : un mécanisme de redémarrage combiné tente de redémarrer la tâche une fois que le parallélisme des ressources cibles est atteint ou que la fenêtre actuellement configurée est terminée. Cela permet également d’éviter des redémarrages successifs susceptibles d’être provoqués par l’arrêt de plusieurs instances Spot. Le mécanisme de redémarrage combiné est uniquement disponible avec le planificateur adaptatif.

Ces fonctionnalités offrent les avantages suivants :
+ Vous pouvez tirer parti des instances Spot pour exécuter des Task Managers et réduire les dépenses liées aux clusters.
+ L’amélioration de la réactivité du Task Manager sur les instances Spot se traduit par une résilience accrue et une planification des tâches plus efficace.
+ Le temps de fonctionnement de vos tâches Flink sera plus élevé, car les redémarrages après l’arrêt d’une instance Spot seront moins nombreux.

## Comment fonctionne une mise hors service progressive
<a name="jobruns-flink-decommission-howitworks"></a>

Prenons l’exemple suivant : vous provisionnez un cluster Amazon EMR sur EKS exécutant Apache Flink, et vous spécifiez des nœuds à la demande pour le Job Manager et des nœuds d’instance Spot pour le Task Manager. Deux minutes avant l’arrêt, le Task Manager reçoit un avis d’interruption.

Dans ce scénario, le Job Manager gère le signal d’interruption d’instance Spot, empêche la planification de tâches supplémentaires sur l’instance Spot et crée un point de contrôle juste à temps pour la tâche de streaming.

Le Job Manager ne redémarre le graphe des tâches que lorsque les nouvelles ressources disponibles sont suffisantes pour permettre le parallélisme des tâches actuelles dans la fenêtre d’intervalle de redémarrage actuelle. L’intervalle de fenêtre de redémarrage est déterminée en fonction du temps nécessaire au remplacement de l’instance Spot, à la création de nouveaux pods du Task Manager et à l’enregistrement auprès du Job Manager.

## Conditions préalables
<a name="jobruns-flink-decommission-prereqs"></a>

Pour utiliser la mise hors service progressive, créez et exécutez une tâche de streaming sur un cluster Amazon EMR on EKS exécutant Apache Flink. Activez le planificateur adaptatif et les Task Managers planifiés sur au moins une instance Spot, comme illustré dans l’exemple suivant. Vous devez utiliser des nœuds à la demande pour le Job Manager, et vous pouvez utiliser des nœuds à la demande pour les Task Managers à condition qu’il existe également au moins une instance Spot.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuration
<a name="jobruns-flink-decommission-config"></a>

Cette section couvre la plupart des configurations que vous pouvez spécifier pour vos besoins de mise hors service. 


| Clé | Description | Valeur par défaut | Valeurs acceptables | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Active la mise hors service progressive du Task Manager.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Active le mécanisme de redémarrage combiné dans le planificateur adaptatif.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  Intervalle de fenêtre de redémarrage combiné pour l’exécution de redémarrages combinés de la tâche. Un entier sans unité est interprété comme une milliseconde.  |  1m  |  Exemples : 30, 60s, 3m, 1h  | 