

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exécution de tâches Flink à l'aide d'Amazon EMR on EKS
<a name="run-flink-jobs"></a>

Les versions 6.13.0 et ultérieures d'Amazon EMR prennent en charge Amazon EMR sur EKS avec Apache Flink, ou l'opérateur Kubernetes pour Flink, en tant que modèle de soumission de tâches pour Amazon EMR sur EKS. Avec Amazon EMR on EKS associé à Apache Flink, vous pouvez déployer et gérer des applications Flink en utilisant le moteur d'exécution de la version Amazon EMR au sein de vos clusters Amazon EKS personnels. Une fois que vous avez déployé l'opérateur Kubernetes pour Flink dans votre cluster Amazon EKS, vous pouvez directement soumettre des applications Flink à l'aide de cet opérateur. L'opérateur gère le cycle de vie des applications Flink.

**Topics**
+ [Configuration et utilisation de l'opérateur Flink Kubernetes](jobruns-flink-kubernetes-operator.md)
+ [Utilisation de Flink Native Kubernetes](jobruns-flink-native-kubernetes.md)
+ [Personnalisation des images Docker pour Flink et FluentD](jobruns-flink-docker-flink-fluentd.md)
+ [Surveillance de l'opérateur Kubernetes pour Flink et des tâche Flink](jobruns-flink-monitoring.md)
+ [Comment Flink favorise la haute disponibilité et la résilience au travail](jobruns-flink-resiliency.md)
+ [Utilisation d'Autoscaler pour les applications Flink](jobruns-flink-autoscaler.md)
+ [Maintenance et résolution des problèmes liés aux tâches Flink sur Amazon EMR sur EKS](jobruns-flink-troubleshooting.md)
+ [Versions prises en charge pour Amazon EMR sur EKS avec Apache Flink](jobruns-flink-security-release-versions.md)

# Configuration et utilisation de l'opérateur Flink Kubernetes
<a name="jobruns-flink-kubernetes-operator"></a>

Les pages suivantes décrivent comment configurer et utiliser l'opérateur Kubernetes pour Flink pour exécuter des tâches Flink avec Amazon EMR on EKS. Les sujets disponibles incluent les prérequis requis, la configuration de votre environnement et l'exécution d'une application Flink sur Amazon EMR sur EKS.

**Topics**
+ [Configuration de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md)
+ [Installation de l'opérateur Flink Kubernetes pour Amazon EMR sur EKS](jobruns-flink-kubernetes-operator-getting-started.md)
+ [Exécution d'une application Flink](jobruns-flink-kubernetes-operator-run-application.md)
+ [Autorisations relatives aux rôles de sécurité pour exécuter une application Flink](jobruns-flink-kubernetes-security.md)
+ [Désinstallation de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS](jobruns-flink-kubernetes-operator-uninstall.md)

# Configuration de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS
<a name="jobruns-flink-kubernetes-operator-setup"></a>

Effectuez les tâches suivantes pour vous préparer avant d'installer l'opérateur Kubernetes pour Flink sur Amazon EKS. Si vous êtes déjà inscrit à Amazon Web Services (AWS) et que vous avez utilisé Amazon EKS, vous êtes presque prêt à utiliser Amazon EMR on EKS. Effectuez les tâches suivantes pour vous préparer pour l'utilisation de l'opérateur Flink sur Amazon EKS. Si vous avez déjà rempli l'une des conditions préalables, vous pouvez l'ignorer et passer à la suivante.
+ **[Installation ou mise à jour vers la dernière version du AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** — Si vous avez déjà installé le AWS CLI, vérifiez que vous disposez de la dernière version.
+ **[Configurer kubectl et eksctl — eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** est un outil de ligne de commande que vous utilisez pour communiquer avec Amazon EKS.
+ **[Installer Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** – Le gestionnaire de packages Helm pour Kubernetes vous aide à installer et à gérer des applications sur votre cluster Kubernetes. 
+ **[Commencez avec Amazon EKS — eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html)** — Suivez les étapes pour créer un nouveau cluster Kubernetes avec des nœuds dans Amazon EKS.
+ **[Choisissez une étiquette de version Amazon EMR (version](jobruns-flink-security-release-versions.md) 6.13.0 ou supérieure)** : l'opérateur Flink Kubernetes est pris en charge par les versions 6.13.0 et supérieures d'Amazon EMR.
+ **[Activez les rôles IAM pour les comptes de service (IRSA) sur le cluster Amazon EKS](setting-up-enable-IAM.md)**.
+ **[Créez un rôle d'exécution des tâches](creating-job-execution-role.md)**.
+ **[Mettez à jour la politique d'approbation du rôle d'exécution des tâches.](setting-up-trust-policy.md)**
+ Créez un rôle d'exécution d'opérateur. Cette étape est facultative. Vous pouvez utiliser le même rôle pour les tâches et l'opérateur Flink. Si vous souhaitez attribuer un rôle IAM différent à votre opérateur, vous pouvez créer un rôle distinct.
+ Mettez à jour la politique d'approbation du rôle d'exécution de l'opérateur. Vous devez ajouter explicitement une entrée de politique d'approbation pour les rôles que vous souhaitez utiliser pour le compte de service de l'opérateur  Kubernetes pour Flink sur Amazon EMR. Vous pouvez suivre cet exemple de format :

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# Installation de l'opérateur Flink Kubernetes pour Amazon EMR sur EKS
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

Cette rubrique vous aide à commencer à utiliser l'opérateur Flink Kubernetes sur Amazon EKS en préparant un déploiement Flink.

## Installez l'opérateur Kubernetes
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

Procédez comme suit pour installer l'opérateur Kubernetes pour Apache Flink.

1. Si vous ne l'avez pas déjà fait, suivez les étapes de [Configuration de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md).

1. Installez le *cert-manager* (une fois par cluster Amazon EKS) pour permettre l'ajout du composant webhook.

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. Installez les Charts de Helm.

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   Exemple de sortie :

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. Attendez que le déploiement soit terminé et vérifiez l'installation des charts.

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. Le message suivant devrait s'afficher lorsque le déploiement est terminé.

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. Utilisez la commande suivante pour voir l'opérateur déployé.

   ```
   helm list --namespace $NAMESPACE
   ```

   Voici un exemple de sortie, où la version de l'application `x.y.z-amzn-n` correspondrait à la version de l'opérateur Flink pour votre version Amazon EMR sur EKS. Pour de plus amples informations, veuillez consulter [Versions prises en charge pour Amazon EMR sur EKS avec Apache Flink](jobruns-flink-security-release-versions.md).

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### Mettre à niveau l'opérateur Kubernetes
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

Pour mettre à jour la version de l'opérateur Flink, procédez comme suit :

1. Désinstallez l'ancien `flink-kubernetes-operator` :`helm uninstall flink-kubernetes-operator -n <NAMESPACE>`.

1. Supprimer le CRD (puisque helm ne supprimera pas automatiquement l'ancien CRD) :. `kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`

1. Réinstallez `flink-kubernetes-operator` avec la version la plus récente.

# Exécution d'une application Flink
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Avec Amazon EMR 6.13.0 et versions ultérieures, vous pouvez exécuter une application Flink avec l’opérateur Kubernetes pour Flink en mode application sur Amazon EMR sur EKS. Avec Amazon EMR 6.15.0 et versions ultérieures, vous pouvez également exécuter une application Flink en mode session. Cette section présente plusieurs manières d’exécuter une application Flink avec Amazon EMR sur EKS.

**Topics**

**Note**  
Il est nécessaire de disposer d'un compartiment Amazon S3 préalablement créé pour conserver les métadonnées de haute disponibilité lorsque vous soumettez votre tâche Flink. Si vous ne souhaitez pas utiliser cette fonctionnalité, vous pouvez la désactiver. Elle est activée par défaut.

**Prérequis** : pour pouvoir exécuter une application Flink avec l’opérateur Kubernetes pour Flink, procédez comme suit dans [Configuration de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md) et [Installez l'opérateur Kubernetes](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator).

------
#### [ Application mode ]

Avec Amazon EMR 6.13.0 et versions ultérieures, vous pouvez exécuter une application Flink avec l’opérateur Kubernetes pour Flink en mode application sur Amazon EMR sur EKS.

1. Créez un fichier de `FlinkDeployment` définition `basic-example-app-cluster.yaml` comme dans l'exemple suivant. Si vous avez activé et utilisé l'un des [opt-in Régions AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), assurez-vous de décommenter et de configurer la configuration. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Soumettez le déploiement Flink à l'aide de la commande ci-dessous. Cela créera également un objet `FlinkDeployment` nommé `basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Accédez à l'interface utilisateur de Flink.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Ouvrez `localhost:8081` pour consulter vos tâches Flink localement.

1. Nettoyez la tâche. N'oubliez pas de nettoyer les artefacts S3 créés pour cette tâche, tels que le pointage de contrôle, la haute disponibilité, les métadonnées de pointage de sauvegarde et les journaux. CloudWatch

Pour plus d'informations sur la soumission de candidatures à Flink via l'opérateur Flink Kubernetes, consultez les exemples d'opérateurs [Flink Kubernetes](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) dans le dossier sur. `apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

Avec Amazon EMR 6.15.0 et versions ultérieures, vous pouvez exécuter une application Flink avec l’opérateur Kubernetes pour Flink en mode session sur Amazon EMR sur EKS.

1. Créez un fichier de `FlinkDeployment` définition nommé `basic-example-app-cluster.yaml` comme dans l'exemple suivant. Si vous avez activé et utilisé l'un des [opt-in Régions AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), assurez-vous de décommenter et de configurer la configuration. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Soumettez le déploiement Flink à l'aide de la commande ci-dessous. Cela créera également un objet `FlinkDeployment` nommé `basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Utilisez la commande suivante pour vérifier que le cluster de session `LIFECYCLE` est défini sur `STABLE` :

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Voici un exemple de sortie :

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Créez un fichier de définition de ressources personnalisé `FlinkSessionJob` `basic-session-job.yaml` avec l’exemple de contenu suivant :

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Soumettez la tâche de session avec la commande ci-dessous. Cela créera également un objet `FlinkSessionJob` `basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Utilisez la commande suivante pour vérifier que le cluster de session `LIFECYCLE` est défini sur `STABLE`, et que `JOB STATUS` indique `RUNNING` :

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Voici un exemple de sortie :

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Accédez à l'interface utilisateur de Flink.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Ouvrez `localhost:8081` pour consulter vos tâches Flink localement.

1. Nettoyez la tâche. N'oubliez pas de nettoyer les artefacts S3 créés pour cette tâche, tels que le pointage de contrôle, la haute disponibilité, les métadonnées de pointage de sauvegarde et les journaux. CloudWatch

------

# Autorisations relatives aux rôles de sécurité pour exécuter une application Flink
<a name="jobruns-flink-kubernetes-security"></a>

Cette rubrique décrit les rôles de sécurité pour le déploiement et l'exécution d'une application Flink. Deux rôles sont nécessaires pour gérer un déploiement et pour créer et gérer des tâches, le rôle d'opérateur et le rôle de poste. Cette rubrique les présente et répertorie leurs autorisations.

## Contrôle d'accès basé sur les rôles
<a name="jobruns-flink-kubernetes-security-rbac"></a>

Pour déployer l'opérateur et exécuter des tâches Flink, nous devons créer deux rôles Kubernetes : un rôle d'opérateur et un rôle de tâche. Amazon EMR crée les deux rôles par défaut lorsque vous installez l'opérateur.

## Rôle d'opérateur
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

Nous utilisons le rôle d'opérateur `flinkdeployments` pour gérer la création et la JobManager gestion de chaque tâche Flink et d'autres ressources, telles que les services.

Le nom par défaut du rôle d'opérateur est `emr-containers-sa-flink-operator` et nécessite les autorisations ci-dessous.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## Rôle de tâche
<a name="jobruns-flink-security-job-role"></a>

 JobManager Utilise le rôle de travail pour créer et gérer TaskManagers et ConfigMaps pour chaque tâche.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# Désinstallation de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

Suivez ces étapes pour désinstaller l'opérateur Kubernetes pour Flink.

1. Supprimez l'opérateur.

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. Supprimez les ressources Kubernetes que Helm ne désinstalle pas.

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (Facultatif) Supprimez le gestionnaire de certificats.

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# Utilisation de Flink Native Kubernetes
<a name="jobruns-flink-native-kubernetes"></a>

Les versions 6.13.0 et supérieures d'Amazon EMR prennent en charge Flink Native Kubernetes en tant qu'outil de ligne de commande que vous pouvez utiliser pour soumettre et exécuter des applications Flink sur un cluster Amazon EMR on EKS.

**Topics**
+ [Configuration de Flink Native Kubernetes pour Amazon EMR on EKS](jobruns-flink-native-kubernetes-setup.md)
+ [Les premiers pas avec Flink Native Kubernetes sur Amazon EMR on EKS](jobruns-flink-native-kubernetes-getting-started.md)
+ [Exigences de sécurité du compte JobManager de service Flink pour Native Kubernetes](jobruns-flink-native-kubernetes-security-requirements.md)

# Configuration de Flink Native Kubernetes pour Amazon EMR on EKS
<a name="jobruns-flink-native-kubernetes-setup"></a>

Effectuez les tâches suivantes pour vous préparer à exécuter une application en utilisant la CLI Flink sur Amazon EMR on EKS. Si vous êtes déjà inscrit à Amazon Web Services (AWS) et que vous avez utilisé Amazon EKS, vous êtes presque prêt à utiliser Amazon EMR on EKS. Si vous avez déjà rempli l'une des conditions préalables, vous pouvez l'ignorer et passer à la suivante.
+ **[Installation ou mise à jour vers la dernière version du AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** — Si vous avez déjà installé le AWS CLI, vérifiez que vous disposez de la dernière version.
+ **[Commencez avec Amazon EKS — eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html)** — Suivez les étapes pour créer un nouveau cluster Kubernetes avec des nœuds dans Amazon EKS.
+ **[Sélectionnez l'URI d'une image de base Amazon EMR](docker-custom-images-tag.md) (version 6.13.0 ou supérieure)**. La commande Kubernetes pour Flink est prise en charge par les versions 6.13.0 et supérieures d'Amazon EMR.
+ Vérifiez que le compte JobManager de service dispose des autorisations appropriées pour créer et regarder TaskManager des pods. Pour plus d'informations, consultez les [exigences de sécurité du compte JobManager de service Flink pour Native Kubernetes](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html).
+ Configurez votre [profil d'informations d'identification AWS](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) local.
+ [Créez ou mettez à jour un fichier kubeconfig pour le cluster Amazon EKS](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html) sur lequel vous souhaitez exécuter les applications Flink.

# Les premiers pas avec Flink Native Kubernetes sur Amazon EMR on EKS
<a name="jobruns-flink-native-kubernetes-getting-started"></a>

Ces étapes vous montrent comment configurer, configurer un compte de service et exécuter une application Flink. Flink Native Kubernetes est utilisé pour déployer Flink sur un cluster Kubernetes en cours d'exécution.

## Configuration et exécution d'une application Flink
<a name="jobruns-flink-native-kubernetes-getting-started-run-application"></a>

Amazon EMR en version 6.13.0 et supérieure prend en charge Flink Native Kubernetes pour l'exécution d'applications Flink sur un cluster Amazon EKS. Pour exécuter une application Spark, procédez comme suit :

1. Pour pouvoir exécuter une application Flink à l'aide de la commande Flink Native Kubernetes, suivez les étapes indiquées dans [Configuration de Flink Native Kubernetes pour Amazon EMR on EKS](jobruns-flink-native-kubernetes-setup.md).

1. [Téléchargez et installez Flink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation).

1. Définissez les valeurs des variables d'environnement suivantes.

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-Région AWS-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. Créez un compte de service pour gérer les ressources Kubernetes.

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. Exécutez la commande CLI `run-application`.

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. Examinez les ressources Kubernetes créées.

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. Transfert de port vers 8081.

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. Accédez localement à l'interface utilisateur de Flink.  
![\[Accédez à l'interface utilisateur de Flink.\]](http://docs.aws.amazon.com/fr_fr/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. Supprimez l'application Flink.

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

Pour plus d'informations sur la soumission d'applications à Flink, consultez [Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) dans la documentation d'Apache Flink.

# Exigences de sécurité du compte JobManager de service Flink pour Native Kubernetes
<a name="jobruns-flink-native-kubernetes-security-requirements"></a>

Le JobManager module Flink utilise un compte de service Kubernetes pour accéder au serveur d'API Kubernetes afin de créer et de regarder des pods. TaskManager Le compte de JobManager service doit disposer des autorisations appropriées sur les create/delete TaskManager pods et TaskManager autoriser le responsable de surveillance ConfigMaps à récupérer l'adresse de JobManager et ResourceManager dans votre cluster.

Les règles suivantes s'appliquent à ce compte de service.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# Personnalisation des images Docker pour Flink et FluentD
<a name="jobruns-flink-docker-flink-fluentd"></a>

Procédez comme suit pour personnaliser les images Docker pour Amazon EMR sur EKS avec des images Apache Flink ou FluentD. Il s'agit notamment de conseils techniques pour obtenir une image de base, la personnaliser, la publier et soumettre une charge de travail.

**Topics**
+ [Conditions préalables](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [Étape 1 : récupérer une image de base depuis Amazon Elastic Container Registry](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [Étape 2 : Personnaliser une image de base](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [Étape 3 : Publiez votre image personnalisée](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [Étape 4 : Soumettre une charge de travail Flink dans Amazon EMR à l'aide d'une image personnalisée](#jobruns-flink-docker-flink-fluentd-submit-workload)

## Conditions préalables
<a name="jobruns-flink-docker-flink-fluentd-prereqs"></a>

Avant de personnaliser votre image Docker, assurez-vous de remplir les conditions préalables suivantes :
+ Vous avez terminé les [étapes de configuration de l'opérateur Flink Kubernetes pour Amazon EMR](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html) sur EKS.
+ Docker installé dans votre environnement. Pour plus d'informations, consultez [Obtenir Docker](https://docs.docker.com/get-docker/).

## Étape 1 : récupérer une image de base depuis Amazon Elastic Container Registry
<a name="jobruns-flink-docker-flink-fluentd-retrieve-base"></a>

L'image de base contient le moteur d'exécution Amazon EMR et les connecteurs dont vous avez besoin pour accéder à d'autres. Services AWS Si vous utilisez Amazon EMR sur EKS avec la version 6.14.0 ou supérieure de Flink, vous pouvez obtenir les images de base depuis la galerie publique Amazon ECR. Parcourez la galerie pour trouver le lien de l'image et extrayez l'image dans votre espace de travail local. Par exemple, pour la version 6.14.0 d'Amazon EMR, la `docker pull` commande suivante renvoie la dernière image de base standard. `emr-6.14.0:latest`Remplacez-le par la version finale que vous souhaitez.

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

Voici les liens vers l'image de la galerie Flink et l'image de la galerie Fluentd :
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fluentd/emr-6.14.0 (](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## Étape 2 : Personnaliser une image de base
<a name="jobruns-flink-docker-flink-fluentd-customize-image"></a>

Les étapes suivantes décrivent comment personnaliser l'image de base que vous avez extraite d'Amazon ECR.

1. Créez un nouveau espace de travail `Dockerfile` sur votre espace de travail local.

1. Modifiez le contenu `Dockerfile` et ajoutez-y le contenu suivant. Cela `Dockerfile` utilise l'image du conteneur que vous avez extraite`public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest`.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   Utilisez la configuration suivante si vous utilisez`Fluentd`.

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. Ajoutez des commandes dans le `Dockerfile` pour personnaliser l'image de base. La commande suivante montre comment installer des bibliothèques Python.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. Dans le répertoire où vous l'avez créée`DockerFile`, exécutez la commande suivante pour créer l'image Docker. Le champ que vous fournissez après le `-t` drapeau est le nom personnalisé de l'image.

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Étape 3 : Publiez votre image personnalisée
<a name="jobruns-flink-docker-flink-fluentd-publish-image"></a>

Vous pouvez désormais publier la nouvelle image Docker dans votre registre Amazon ECR.

1. Exécutez la commande suivante pour créer un référentiel Amazon ECR afin de stocker votre image Docker. Donnez un nom à votre référentiel, par exemple `emr_custom_repo.` Pour plus d'informations, consultez [Créer un référentiel](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository) dans le guide de l'utilisateur d'Amazon Elastic Container Registry.

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. Exécutez la commande suivante pour vous authentifier dans votre registre par défaut. Pour plus d'informations, consultez [Authentifier auprès de votre registre par défaut](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry) dans le guide de l'utilisateur d'Amazon Elastic Container Registry.

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. Transmettez l'image. Pour plus d'informations, consultez la section [Envoyer une image vers Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image) dans le guide de l'utilisateur d'Amazon Elastic Container Registry.

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Étape 4 : Soumettre une charge de travail Flink dans Amazon EMR à l'aide d'une image personnalisée
<a name="jobruns-flink-docker-flink-fluentd-submit-workload"></a>

Apportez les modifications suivantes à vos `FlinkDeployment` spécifications pour utiliser une image personnalisée. Pour ce faire, entrez votre propre image dans la `spec.image` ligne de votre spécification de déploiement.

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

Pour utiliser une image personnalisée pour votre tâche Fluentd, entrez votre propre image dans la `monitoringConfiguration.image` ligne de votre spécification de déploiement.

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# Surveillance de l'opérateur Kubernetes pour Flink et des tâche Flink
<a name="jobruns-flink-monitoring"></a>

Cette section décrit plusieurs manières de surveiller vos tâches Flink à l'aide d'Amazon EMR on EKS. Il s'agit notamment de l'intégration de Flink au service géré Amazon pour Prometheus, de l'utilisation *du tableau de bord Web Flink, qui fournit l'*état des tâches et des métriques, ou de l'utilisation d'une configuration de surveillance pour envoyer les données de journal à Amazon S3 et. Amazon CloudWatch

**Topics**
+ [Utilisez Amazon Managed Service pour Prometheus pour surveiller les jobs Flink](jobruns-flink-monitoring-prometheus.md)
+ [Utiliser l'interface utilisateur de Flink pour surveiller les tâches Flink](jobruns-flink-monitoring-ui.md)
+ [Utiliser la configuration de surveillance pour surveiller l'opérateur Flink Kubernetes et les tâches Flink](jobruns-flink-monitoring-configuration.md)

# Utilisez Amazon Managed Service pour Prometheus pour surveiller les jobs Flink
<a name="jobruns-flink-monitoring-prometheus"></a>

Vous pouvez intégrer Apache Flink à Amazon Managed Service for Prometheus (portail de gestion). Amazon Managed Service for Prometheus prend en charge l'ingestion de métriques à partir de serveurs Amazon Managed Service for Prometheus dans des clusters exécutés sur Amazon EKS. Amazon Managed Service for Prometheus fonctionne avec un serveur Prometheus déjà en cours d'exécution sur votre cluster Amazon EKS. L'exécution de l'intégration d'Amazon Managed Service for Prometheus à l'opérateur Flink pour Amazon EMR déploiera et configurera automatiquement un serveur Prometheus pour l'intégrer à Amazon Managed Service for Prometheus.

1. [Créez un espace de travail Amazon Managed Service for Prometheus](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html). Cet espace de travail sert de point de terminaison pour l'ingestion. Vous aurez besoin de l'URL d'écriture à distance plus tard.

1. Configurez les rôles IAM pour les comptes de service.

   Pour cette méthode d'intégration, utilisez des rôles IAM pour les comptes de service du cluster Amazon EKS où le serveur Prometheus est exécuté. Ces rôles sont également appelés *fonctions du service*.

   Si vous ne disposez pas encore de rôles, [configurez des rôles de service pour l'ingestion de métriques à partir des clusters Amazon EKS.](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html)

   Avant de continuer, créez un rôle IAM appelé `amp-iamproxy-ingest-role`.

1. Installez l'opérateur Flink pour Amazon EMR avec Amazon Managed Service for Prometheus.

Maintenant que vous disposez d'un espace de travail Amazon Managed Service for Prometheus, d'un rôle IAM dédié à Amazon Managed Service for Prometheus et des autorisations nécessaires, vous pouvez installer l'opérateur Flink pour Amazon EMR.

Créez un fichier `enable-amp.yaml`. Ce fichier vous permet d'utiliser une configuration personnalisée pour remplacer les paramètres d'Amazon Managed Service for Prometheus. Assurez-vous d'utiliser vos propres rôles.

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

Utilisez la commande [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) pour transmettre les remplacements au chart `flink-kubernetes-operator`.

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

Cette commande installe automatiquement un reporter Prometheus dans l'opérateur sur le port 9999. Tout `FlinkDeployment` futur expose également un port `metrics` sur 9249.
+ Les métriques de l'opérateur Flink apparaissent dans Prometheus sous l'étiquette `flink_k8soperator_`.
+ Les métriques du gestionnaire de tâches Flink apparaissent dans Prometheus sous l'étiquette `flink_taskmanager_`.
+ Les métriques du gestionnaire de tâches Flink apparaissent dans Prometheus sous l'étiquette `flink_jobmanager_`.

# Utiliser l'interface utilisateur de Flink pour surveiller les tâches Flink
<a name="jobruns-flink-monitoring-ui"></a>

Pour surveiller l'état et les performances d'une application Flink en cours d'exécution, utilisez le *tableau de bord web de Flink*. Ce tableau de bord fournit des informations sur le statut de la tâche, le nombre de tâches TaskManagers, les métriques et les journaux associés à la tâche. Il vous permet également de consulter et de modifier la configuration de la tâche Flink, et d'interagir avec le cluster Flink pour soumettre ou annuler des tâches.

Pour accéder au tableau de bord Web de Flink pour une application Flink en cours d'exécution sur Kubernetes :

1. Utilisez la `kubectl port-forward` commande pour transférer un port local vers le port sur lequel le tableau de bord Web Flink s'exécute dans les modules de TaskManager l'application Flink. Par défaut, ce port est 8081. Remplacez *deployment-name* par le nom du déploiement de l'application Flink indiqué ci-dessus.

   ```
   kubectl get deployments -n namespace
   ```

   Exemple de sortie :

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. Si vous souhaitez utiliser un autre port localement, utilisez le paramètre:8081*local-port*.

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. Dans un navigateur web, naviguez vers `http://localhost:8081` (ou `http://localhost:local-port` si vous avez utilisé un port local personnalisé) pour accéder au tableau de bord web de Flink. Ce tableau de bord affiche des informations sur l'application Flink en cours d'exécution, telles que le statut de la tâche, le nombre de tâches TaskManagers, ainsi que les métriques et les journaux associés à la tâche.  
![\[Exemple d'interface utilisateur du tableau de bord Flink\]](http://docs.aws.amazon.com/fr_fr/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# Utiliser la configuration de surveillance pour surveiller l'opérateur Flink Kubernetes et les tâches Flink
<a name="jobruns-flink-monitoring-configuration"></a>

La configuration de surveillance vous permet de configurer facilement l'archivage des journaux de votre application Flink et des journaux des opérateurs dans S3 and/or CloudWatch (vous pouvez choisir l'un ou les deux). Cela ajoute un sidecar FluentD à vos pods TaskManager et transmet ensuite les JobManager journaux de ces composants à vos récepteurs configurés.

**Note**  
Vous devez configurer les rôles IAM pour le compte de service de votre opérateur Flink et de votre tâche Flink (comptes de service) afin de pouvoir utiliser cette fonctionnalité, car elle nécessite des interactions avec d'autres Services AWS. Vous devez le configurer en utilisant IRSA dans [Configuration de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md).

## Journaux des applications Flink
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

Vous pouvez définir cette configuration de la manière suivante.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

Les options de configuration sont les suivantes.
+ `s3MonitoringConfiguration` : clé de configuration permettant de configurer le transfert vers S3
  + `logUri` (obligatoire) : le chemin du compartiment S3 dans lequel vous souhaitez stocker vos journaux.
  + Le chemin sur S3 une fois les journaux chargés ressemblera à ce qui suit.
    + Aucune rotation des journaux n'est activée :

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + La rotation des journaux est activée. Vous pouvez utiliser à la fois un fichier en rotation et un fichier actuel (sans horodatage).

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      Le format suivant est un nombre incrémentiel.

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + Les autorisations IAM suivantes sont nécessaires pour utiliser ce transféreur.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration`— clé de configuration vers laquelle configurer le transfert CloudWatch.
  + `logGroupName`(obligatoire) : nom du groupe de CloudWatch journaux auquel vous souhaitez envoyer des journaux (crée automatiquement le groupe s'il n'existe pas).
  + `logStreamNamePrefix` (facultatif) : nom du flux de journaux auquel vous souhaitez envoyer des journaux. La valeur par défaut est une chaîne vide. Le format est le suivant :

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + Les autorisations IAM suivantes sont nécessaires pour utiliser ce transféreur.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources` (facultatif) : clé de configuration permettant de définir les limites de ressources sur le conteneur sidecar Fluentbit lancé.
  + `memoryLimit` (facultatif) : la valeur par défaut est 512Mi. Ajustez selon vos besoins.
  + `cpuLimit` (facultatif) : cette option n'a pas de valeur par défaut. Ajustez selon vos besoins.
+ `containerLogRotationConfiguration` (facultatif) : contrôle le comportement de rotation des journaux du conteneur. Il est activé par défaut.
  + `rotationSize` (obligatoire) : indique la taille du fichier pour la rotation des journaux. La plage de valeurs possibles est comprise entre 2 Ko et 2 Go. La partie unitaire numérique du paramètre rotationSize est transmise sous forme d'entier. Les valeurs décimales n'étant pas prises en charge, vous pouvez indiquer une taille de rotation de 1,5 Go, par exemple, avec la valeur 1500 Mo. La valeur par défaut est 2 Go.
  + `maxFilesToKeep` (obligatoire) : indique le nombre maximum de fichiers à retenir dans le conteneur après la rotation. La valeur minimale est 1 et la valeur maximale est 50. La valeur par défaut est 10.

## Journaux de l'opérateur Flink
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

Il est également possible d'activer l'archivage des journaux de l'opérateur en utilisant les options ci-dessous dans le fichier `values.yaml` de l'installation de vos Charts de Helm. Vous pouvez activer S3 ou CloudWatch les deux.

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

Les options de configuration disponibles sous `monitoringConfiguration` sont les suivantes.
+ `s3MonitoringConfiguration` : définissez cette option pour archiver dans S3.
+ `logUri` (obligatoire) : le chemin du compartiment S3 dans lequel vous souhaitez stocker vos journaux.
+ Les formats suivants indiquent à quoi peuvent ressembler les chemins des compartiments S3 une fois les journaux chargés.
  + Aucune rotation des journaux n'est activée.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + La rotation des journaux est activée. Vous pouvez utiliser à la fois un fichier en rotation et un fichier actuel (sans horodatage).

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    L'index du format suivant est un nombre incrémentiel.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration`— la clé de configuration vers laquelle configurer le transfert CloudWatch.
  + `logGroupName`(obligatoire) : nom du groupe de CloudWatch journaux auquel vous souhaitez envoyer des journaux. Le groupe est automatiquement créé s'il n'existe pas.
  + `logStreamNamePrefix` (facultatif) : nom du flux de journaux auquel vous souhaitez envoyer des journaux. La valeur par défaut est une chaîne vide. Le format CloudWatch est le suivant :

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources` (facultatif) : clé de configuration permettant de définir les limites de ressources sur le conteneur sidecar Fluentbit lancé.
  + `memoryLimit` (facultatif) : limite de mémoire. Ajustez selon vos besoins. La valeur par défaut est 512Mi.
  + `cpuLimit` : la limite de CPU. Ajustez selon vos besoins. Aucune valeur par défaut.
+ `containerLogRotationConfiguration` (facultatif) : contrôle le comportement de rotation des journaux du conteneur. Il est activé par défaut.
  + `rotationSize` (obligatoire) : indique la taille du fichier pour la rotation des journaux. La plage de valeurs possibles est comprise entre 2 Ko et 2 Go. La partie unitaire numérique du paramètre rotationSize est transmise sous forme d'entier. Les valeurs décimales n'étant pas prises en charge, vous pouvez indiquer une taille de rotation de 1,5 Go, par exemple, avec la valeur 1500 Mo. La valeur par défaut est 2 Go.
  + `maxFilesToKeep` (obligatoire) : indique le nombre maximum de fichiers à retenir dans le conteneur après la rotation. La valeur minimale est 1 et la valeur maximale est 50. La valeur par défaut est 10.

# Comment Flink favorise la haute disponibilité et la résilience au travail
<a name="jobruns-flink-resiliency"></a>

Les sections suivantes expliquent comment Flink améliore la fiabilité et la disponibilité des offres d'emploi. Pour ce faire, il utilise des fonctionnalités intégrées telles que la haute disponibilité de Flink et diverses fonctionnalités de restauration en cas de panne.

**Topics**
+ [Utilisation de la haute disponibilité (HA) pour les opérateurs et les applications Flink](jobruns-flink-using-ha.md)
+ [Optimisation des temps de redémarrage des tâches Flink pour la récupération des tâches et la mise à l’échelle des opérations avec Amazon EMR sur EKS](jobruns-flink-restart.md)
+ [Mise hors service progressive des instances Spot avec Flink sur Amazon EMR sur EKS](jobruns-flink-decommission.md)

# Utilisation de la haute disponibilité (HA) pour les opérateurs et les applications Flink
<a name="jobruns-flink-using-ha"></a>

Cette rubrique explique comment configurer la haute disponibilité et décrit son fonctionnement pour différents cas d'utilisation. Cela inclut lorsque vous utilisez le Job Manager et lorsque vous utilisez les kubernetes natifs de Flink.

## Haute disponibilité de l’opérateur Flink
<a name="jobruns-flink-ha-operator"></a>

Nous activons la *haute disponibilité* de l’opérateur Flink afin de pouvoir basculer vers un opérateur Flink de secours et réduire au minimum les temps d’arrêt dans la boucle de contrôle de l’opérateur en cas de défaillance. La haute disponibilité est activée par défaut et le nombre initial par défaut de répliques d’opérateur est de 2. Vous pouvez configurer le champ des répliques dans votre fichier `values.yaml` pour les Charts de Helm.

Les champs suivants sont personnalisables :
+ `replicas` (facultatif, la valeur par défaut est 2) : si vous définissez ce nombre sur une valeur supérieure à 1, d'autres opérateurs de secours seront créés et vous pourrez reprendre votre tâche plus rapidement.
+ `highAvailabilityEnabled` (facultatif, la valeur par défaut est « true ») : permet d'indiquer si vous souhaitez activer la haute disponibilité (HA). Le fait de définir ce paramètre comme « true » permet de prendre en charge le déploiement multi-AZ et de définir les paramètres `flink-conf.yaml` corrects.

Vous pouvez désactiver la haute disponibilité pour votre opérateur en paramétrant la configuration ci-dessous dans votre fichier `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Déploiement multi-AZ**

Nous créons les pods des opérateurs dans plusieurs zones de disponibilité. Il s'agit d'une contrainte souple, et vos pods d'opérateur seront planifiés dans la même zone si vous ne disposez pas de suffisamment de ressources dans une autre zone.

**Détermination de la réplique leader**

 Si HA est activé, les répliques utilisent un bail pour déterminer lequel des deux JMs est le leader et utilisent un bail K8s pour l'élection du leader. Vous pouvez décrire le bail et consulter le champ .Spec.Holder Identity pour déterminer le leader actuel

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interaction Flink-S3**

**Configuration des informations d'identification d'accès**

Assurez-vous d'avoir configuré IRSA avec les autorisations IAM appropriées pour accéder au compartiment S3.

**Récupération des fichiers JAR des tâches à partir du mode d'application S3**

L'opérateur Flink prend également en charge la récupération des fichiers JAR des applications à partir de S3. Il vous suffit de fournir l'emplacement S3 du JArUri dans votre FlinkDeployment spécification.

Vous pouvez également utiliser cette fonctionnalité pour télécharger d'autres artefacts tels que PyFlink des scripts. Le script Python résultant est déposé sous le chemin `/opt/flink/usrlib/`.

L'exemple suivant montre comment utiliser cette fonctionnalité pour une PyFlink tâche. Notez les champs jarURI et args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Connecteurs S3 pour Flink**

Flink est livré avec deux connecteurs S3 (indiqués ci-dessous). Les sections suivantes expliquent quand utiliser quel connecteur.

**Point de contrôle : connecteur S3 pour Presto**
+ Définissez le schéma S3 sur s3p://
+ Le connecteur recommandé à utiliser pour le point de contrôle vers S3. Pour plus d'informations, consultez la section [spécifique à S3](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) dans la documentation d'Apache Flink.

Exemple de FlinkDeployment spécification :

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Lecture et écriture sur S3 : connecteur Hadoop S3**
+ Définissez le schéma S3 sur `s3://` ou (`s3a://`)
+ Le connecteur recommandé pour lire et écrire des fichiers à partir de S3 (uniquement le connecteur S3 qui implémente l'[interface Filesystem de Flinks](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ Par défaut, nous avons défini `fs.s3a.aws.credentials.provider` le `flink-conf.yaml` fichier, qui est`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Si vous remplacez complètement la `flink-conf` par défaut et que vous interagissez avec S3, assurez-vous d'utiliser ce fournisseur.

Exemple de FlinkDeployment spécification

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Gestionnaire de tâches Flink
<a name="jobruns-flink-ha-manager"></a>

La haute disponibilité (HA) pour les déploiements de Flink permet aux tâches de continuer à progresser même en cas d'erreur transitoire et de panne. JobManager Les tâches redémarreront, mais à partir du dernier point de contrôle validé lorsque la haute disponibilité est activée. Si la haute disponibilité n'est pas activée, Kubernetes redémarrera votre tâche JobManager, mais votre tâche redémarrera comme une nouvelle tâche et perdra sa progression. Après avoir configuré HA, nous pouvons demander à Kubernetes de stocker les métadonnées HA dans un stockage persistant afin de les référencer en cas de défaillance transitoire du, JobManager puis de reprendre nos tâches à partir du dernier point de contrôle réussi.

La haute disponibilité est activée par défaut pour vos tâches Flink (le nombre de répliques est défini sur 2, ce qui nécessite la mise à disposition d'un emplacement de stockage S3 pour l'enregistrement des métadonnées haute disponibilité).

**Configurations de haute disponibilité**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Voici les descriptions des configurations de haute disponibilité ci-dessus dans le gestionnaire de tâches (définies sous .spec.JobManager) :
+ `highAvailabilityEnabled` (facultatif, la valeur par défaut est « true ») : définissez ce paramètre sur `false ` si vous ne voulez pas activer la haute disponibilité et si vous ne souhaitez pas utiliser les configurations de haute disponibilité fournies. Vous pouvez toujours manipuler le champ « replicas » pour configurer manuellement la haute disponibilité.
+ `replicas`(facultatif, la valeur par défaut est 2) : Si vous définissez ce nombre sur une valeur supérieure à 1, vous créez un autre mode de veille JobManagers et vous pouvez reprendre votre travail plus rapidement. Si vous désactivez la haute disponibilité, vous devez définir le nombre de répliques sur 1, sinon vous continuerez à recevoir des erreurs de validation (une seule réplique est prise en charge si la haute disponibilité n'est pas activée).
+ `storageDir` (obligatoire) : étant donné que le nombre de répliques est égal à 2 par défaut, nous devons fournir un StorageDir persistant. Actuellement, ce champ n'accepte que les chemins S3 comme emplacement de stockage.

**Placement des pods**

 Si vous activez la haute disponibilité, nous essayons également de colocaliser les pods dans la même zone, ce qui améliore les performances (réduction de la latence du réseau grâce à la présence de pods dans les mêmes zones AZs). Ceci est un processus réalisé au mieux des possibilités, signifiant que si vous ne disposez pas de ressources suffisantes dans la zone de disponibilité où la majorité de vos pods sont planifiés, les pods restants seront tout de même planifiés, mais pourraient se retrouver sur un nœud situé en dehors de cette zone de disponibilité.

**Détermination de la réplique leader**

Si HA est activé, les répliques utilisent un bail pour déterminer lequel d'entre eux JMs est le leader et utilisent un K8s Configmap comme banque de données pour stocker ces métadonnées. Si vous souhaitez identifier le leader, vous pouvez consulter le contenu de la Configmap et la clé `org.apache.flink.k8s.leader.restserver` sous les données pour trouver le pod K8s correspondant à l’adresse IP indiquée. Vous pouvez également utiliser les commandes bash ci-dessous.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Tâche Flink – Native Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

À partir de la version 6.13.0, Amazon EMR prend en charge Flink Native Kubernetes pour l’exécution d’applications Flink en mode haute disponibilité sur un cluster Amazon EKS. 

**Note**  
Il est nécessaire de disposer d'un compartiment Amazon S3 préalablement créé pour conserver les métadonnées de haute disponibilité lorsque vous soumettez votre tâche Flink. Si vous ne souhaitez pas utiliser cette fonctionnalité, vous pouvez la désactiver. Elle est activée par défaut.

Pour activer la fonctionnalité haute disponibilité de Flink, spécifiez les paramètres Flink suivants lorsque vous [exécutez la commande `run-application` dans la CLI](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Les paramètres sont définis dans l’exemple ci-dessous.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** : compartiment Amazon S3 où vous souhaitez stocker les métadonnées de haute disponibilité pour votre tâche

  **`Dkubernetes.jobmanager.replicas`** : nombre de pods Job Manager à créer sous la forme d’un entier supérieur à `1`

  **`Dkubernetes.cluster-id`** : ID unique qui identifie le cluster Flink

# Optimisation des temps de redémarrage des tâches Flink pour la récupération des tâches et la mise à l’échelle des opérations avec Amazon EMR sur EKS
<a name="jobruns-flink-restart"></a>

Lorsqu’une tâche échoue ou qu’une opération de mise à l’échelle a lieu, Flink tente de réexécuter la tâche à partir du dernier point de contrôle terminé. L’exécution du processus de redémarrage peut durer une minute ou plus, en fonction de la taille de l’état du point de contrôle et du nombre de tâches parallèles. Pendant la période de redémarrage, les tâches en attente peuvent s’accumuler pour la tâche. Flink peut cependant permettre d’optimiser la vitesse de récupération et de redémarrage des graphes d’exécution afin d’améliorer la stabilité des tâches.

Cette page décrit certaines des manières dont Amazon EMR Flink peut améliorer le temps de redémarrage des tâches lors de la reprise des tâches ou des opérations de dimensionnement sur des instances ponctuelles. Les instances Spot sont des capacités de calcul inutilisées disponibles à prix discount. Ses comportements sont uniques, notamment des interruptions occasionnelles. Il est donc important de comprendre comment Amazon EMR on EKS les gère, notamment comment Amazon EMR on EKS procède à la mise hors service et au redémarrage des tâches.

**Topics**
+ [Récupération locale des tâches](#flink-restart-task-local)
+ [Récupération locale des tâches par montage de volume Amazon EBS](#flink-restart-task-local-ebs)
+ [Point de contrôle incrémentiel générique basé sur les journaux](#flink-restart-log-check)
+ [Récupération précise](#flink-restart-fine-grained)
+ [Mécanisme de redémarrage combiné dans le planificateur adaptatif](#flink-restart-combined)

## Récupération locale des tâches
<a name="flink-restart-task-local"></a>

**Note**  
La récupération locale des tâches est prise en charge par Flink sur Amazon EMR sur EKS 6.14.0 et versions ultérieures.

Avec les points de contrôle Flink, chaque tâche produit un instantané de son état que Flink écrit sur un stockage distribué comme Amazon S3. En cas de récupération, les tâches restaurent leur état à partir du stockage distribué. Le stockage distribué offre une tolérance aux pannes et peut redistribuer l’état lors de la mise à l’échelle, car tous les nœuds peuvent y accéder.

Cependant, un magasin distribué à distance présente également un inconvénient : toutes les tâches doivent lire leur état depuis un emplacement distant sur le réseau, ce qui peut entraîner l’augmentation du temps de récupération pour les états importants lors des opérations de récupération ou de mise à l’échelle de tâches.

La *récupération locale des tâches* permet de résoudre ce problème. Les tâches enregistrent leur état au point de contrôle sur un stockage secondaire local à la tâche, par exemple sur un disque local. Elles stockent également leur état sur le stockage principal, à savoir Amazon S3 dans notre cas. Lors de la récupération, le planificateur planifie les tâches sur le même Task Manager que celui dans lequel les tâches ont été exécutées précédemment afin qu’elles puissent être récupérées depuis le magasin d’états local au lieu de lire depuis le magasin d’état distant. Pour plus d’informations, voir la rubrique [Récupération locale des tâches](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) de la *documentation Apache Flink*.

Nos tests d’évaluation avec des exemples de tâches ont montré que le temps de récupération était passé de quelques minutes à quelques secondes grâce à l’activation de la récupération locale des tâches.

Pour activer la récupération locale des tâches, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Récupération locale des tâches par montage de volume Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**Note**  
La récupération locale des tâches par Amazon EBS est prise en charge par Flink sur Amazon EMR sur EKS 6.15.0 et versions ultérieures.

Avec Flink sur Amazon EMR sur EKS, vous pouvez automatiquement provisionner des volumes Amazon EBS sur les pods TaskManager pour la restauration locale des tâches. Le montage de superposition par défaut comprend un volume de 10 Go, ce qui est suffisant pour les tâches dont l’état est moins important. Les tâches dont les états sont importants peuvent activer l’option de *montage automatique de volume EBS*. Les pods TaskManager sont automatiquement créés et montés lors de la création du pod et supprimés lors de sa suppression.

Procédez comme suit pour activer le montage automatique de volume EBS pour Flink dans Amazon EMR sur EKS :

1. Exportez les valeurs des variables suivantes que vous utiliserez lors des étapes suivantes.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Créez ou mettez à jour un fichier YAML `kubeconfig` pour votre cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Créez un compte de service IAM pour le pilote CSI Amazon EBS sur votre cluster Amazon EKS. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Créez le pilote CSI Amazon EBS à l’aide de la commande suivante :

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Créez la classe de stockage Amazon EBS à l’aide de la commande suivante :

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Appliquez ensuite la classe :

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm installe l’opérateur Kubernetes pour Flink sur Amazon EMR avec des options permettant de créer un compte de service. Cela crée le `emr-containers-sa-flink` à utiliser dans le déploiement de Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Pour soumettre la tâche Flink et activer le provisionnement automatique des volumes EBS pour la récupération locale des tâches, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`. Ajustez la limite de taille d’état pour la tâche. Définissez `serviceAccount` sur `emr-containers-sa-flink`. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes. Et omettez le `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Lorsque vous êtes prêt à supprimer le plug-in de pilote CSI Amazon EBS, utilisez les commandes suivantes :

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Point de contrôle incrémentiel générique basé sur les journaux
<a name="flink-restart-log-check"></a>

**Note**  
Les points de contrôle incrémentiels génériques basés sur les journaux sont pris en charge avec Flink sur Amazon EMR sur EKS 6.14.0 et versions ultérieures.

Les points de contrôle incrémentiels génériques basés sur les journaux ont été ajoutés à Flink 1.16 pour rendre les points de contrôle plus fréquents. Un intervalle de point de contrôle plus court entraîne souvent une réduction du travail de récupération, car moins d’événements doivent être traités de nouveau après la récupération. Pour plus d’informations, accédez à la page [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) sur le *blog Apache Flink*.

Sur base de quelques exemples de tâches, nos tests d’évaluation ont montré que le temps de contrôle était passé de quelques minutes à quelques secondes grâce au point de contrôle incrémentiel générique basé sur les journaux.

Pour activer les points de contrôle incrémentiels génériques basés sur les journaux, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Récupération précise
<a name="flink-restart-fine-grained"></a>

**Note**  
La récupération précise du planificateur par défaut est prise en charge avec Flink sur Amazon EMR sur EKS 6.14.0 et versions ultérieures. La récupération précise du planificateur adaptatif est prise en charge avec Flink sur Amazon EMR sur EKS 6.15.0 et versions ultérieures.

Lorsqu’une tâche échoue pendant son exécution, Flink réinitialise l’intégralité du graphe d’exécution et déclenche une réexécution complète depuis le dernier point de contrôle terminé. Cette opération est plus chère qu’une simple réexécution des tâches qui ont échoué. La récupération précise redémarre uniquement le composant connecté au pipeline de la tâche ayant échoué. Dans l’exemple suivant, le graphe de tâches présente 5 sommets (`A` à `E`). Toutes les connexions entre les sommets sont en pipeline avec une distribution ponctuelle, et la valeur de `parallelism.default` pour la tâche est définie sur `2`. 

```
A → B → C → D → E
```

Dans cet exemple, 10 tâches sont en cours d’exécution au total. Le premier pipeline (`a1` à `e1`) s’exécute sur un TaskManager (`TM1`), et le deuxième pipeline (`a2` à `e2`) s’exécute sur un autre TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Deux composants sont connectés en pipeline : `a1 → e1` et `a2 → e2`. En cas d’échec de `TM1` ou de `TM2`, l’échec affecte uniquement les 5 tâches du pipeline dans lequel TaskManager était en cours d’exécution. La stratégie de redémarrage démarre uniquement le composant en pipeline concerné. 

La récupération précise ne fonctionne qu’avec des tâches Flink parfaitement parallèles. Elle n’est pas prise en charge par `keyBy()` ou par les opérations `redistribute()`. Pour plus d’informations, accédez à la page [FLIP-1 : Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) du projet Jira *Flink Improvement Proposal*.

Pour activer la récupération précise, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Mécanisme de redémarrage combiné dans le planificateur adaptatif
<a name="flink-restart-combined"></a>

**Note**  
Le mécanisme de redémarrage combiné du planificateur adaptatif est pris en charge avec Flink sur Amazon EMR 6.15.0 et versions ultérieures.

Le planificateur adaptatif peut ajuster le parallélisme de la tâche en fonction des emplacements disponibles. Si le nombre d’emplacements disponibles est insuffisant, le planificateur réduit automatiquement le parallélisme pour s’adapter au parallélisme des tâches configuré. Si de nouveaux emplacements sont disponibles, la tâche fait l’objet d’une augmentation d’échelle selon le parallélisme des tâches configuré. Un planificateur adaptatif permet d’éviter les temps d’arrêt de la tâche lorsque les ressources disponibles sont insuffisantes. Il s’agit du planificateur pris en charge par l’outil de mise à l’échelle automatique Flink. Nous recommandons donc l’utilisation d’un planificateur adaptatif avec Amazon EMR Flink. Les planificateurs adaptatifs peuvent toutefois effectuer plusieurs redémarrages en peu de temps, à raison d’un redémarrage pour chaque nouvelle ressource ajoutée, ce qui peut entraîner une baisse des performances de la tâche.

Avec Amazon EMR 6.15.0 et versions ultérieures, Flink dispose d’un mécanisme de redémarrage combiné dans le planificateur adaptatif qui ouvre une fenêtre de redémarrage lorsque la première ressource est ajoutée, puis attend l’intervalle de fenêtre configuré de 1 minute par défaut. Un seul redémarrage est effectué lorsque les ressources disponibles sont suffisantes pour exécuter la tâche avec un parallélisme configuré ou lorsque l’intervalle expire.

Grâce à quelques exemples de tâches, nos tests d’évaluation ont montré que cette fonctionnalité traite 10 % d’enregistrements supplémentaires par rapport au comportement par défaut lorsque vous utilisez le planificateur adaptatif et l’outil de mise à l’échelle automatique Flink.

Pour activer le mécanisme de redémarrage combiné, définissez les configurations suivantes dans votre fichier `flink-conf.yaml`.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Mise hors service progressive des instances Spot avec Flink sur Amazon EMR sur EKS
<a name="jobruns-flink-decommission"></a>

L’utilisation de Flink avec Amazon EMR sur EKS peut améliorer le temps de redémarrage des tâches lors des opérations de récupération ou de mise à l’échelle des tâches.

## Présentation de
<a name="jobruns-flink-decommission-overview"></a>

La version 6.15.0 et les versions ultérieures d’Amazon EMR prennent en charge la mise hors service progressive des Task Managers sur les instances Spot dans Amazon EMR sur EKS avec Apache Flink. Dans le cadre de cette fonctionnalité, Amazon EMR sur EKS avec Flink fournit les fonctionnalités suivantes :
+ **Just-in-time pointage de contrôle** — Les tâches de streaming Flink peuvent répondre à une interruption d'une instance Spot, effectuer un point de contrôle just-in-time (JIT) des tâches en cours d'exécution et empêcher la planification de tâches supplémentaires sur ces instances Spot. Les points de contrôle juste à temps sont pris en charge avec le planificateur par défaut et le planificateur adaptatif.
+ **Mécanisme de redémarrage combiné** : un mécanisme de redémarrage combiné tente de redémarrer la tâche une fois que le parallélisme des ressources cibles est atteint ou que la fenêtre actuellement configurée est terminée. Cela permet également d’éviter des redémarrages successifs susceptibles d’être provoqués par l’arrêt de plusieurs instances Spot. Le mécanisme de redémarrage combiné est uniquement disponible avec le planificateur adaptatif.

Ces fonctionnalités offrent les avantages suivants :
+ Vous pouvez tirer parti des instances Spot pour exécuter des Task Managers et réduire les dépenses liées aux clusters.
+ L’amélioration de la réactivité du Task Manager sur les instances Spot se traduit par une résilience accrue et une planification des tâches plus efficace.
+ Le temps de fonctionnement de vos tâches Flink sera plus élevé, car les redémarrages après l’arrêt d’une instance Spot seront moins nombreux.

## Comment fonctionne une mise hors service progressive
<a name="jobruns-flink-decommission-howitworks"></a>

Prenons l’exemple suivant : vous provisionnez un cluster Amazon EMR sur EKS exécutant Apache Flink, et vous spécifiez des nœuds à la demande pour le Job Manager et des nœuds d’instance Spot pour le Task Manager. Deux minutes avant l’arrêt, le Task Manager reçoit un avis d’interruption.

Dans ce scénario, le Job Manager gère le signal d’interruption d’instance Spot, empêche la planification de tâches supplémentaires sur l’instance Spot et crée un point de contrôle juste à temps pour la tâche de streaming.

Le Job Manager ne redémarre le graphe des tâches que lorsque les nouvelles ressources disponibles sont suffisantes pour permettre le parallélisme des tâches actuelles dans la fenêtre d’intervalle de redémarrage actuelle. L’intervalle de fenêtre de redémarrage est déterminée en fonction du temps nécessaire au remplacement de l’instance Spot, à la création de nouveaux pods du Task Manager et à l’enregistrement auprès du Job Manager.

## Conditions préalables
<a name="jobruns-flink-decommission-prereqs"></a>

Pour utiliser la mise hors service progressive, créez et exécutez une tâche de streaming sur un cluster Amazon EMR on EKS exécutant Apache Flink. Activez le planificateur adaptatif et les Task Managers planifiés sur au moins une instance Spot, comme illustré dans l’exemple suivant. Vous devez utiliser des nœuds à la demande pour le Job Manager, et vous pouvez utiliser des nœuds à la demande pour les Task Managers à condition qu’il existe également au moins une instance Spot.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuration
<a name="jobruns-flink-decommission-config"></a>

Cette section couvre la plupart des configurations que vous pouvez spécifier pour vos besoins de mise hors service. 


| Clé | Description | Valeur par défaut | Valeurs acceptables | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Active la mise hors service progressive du Task Manager.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Active le mécanisme de redémarrage combiné dans le planificateur adaptatif.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  Intervalle de fenêtre de redémarrage combiné pour l’exécution de redémarrages combinés de la tâche. Un entier sans unité est interprété comme une milliseconde.  |  1m  |  Exemples : 30, 60s, 3m, 1h  | 

# Utilisation d'Autoscaler pour les applications Flink
<a name="jobruns-flink-autoscaler"></a>

L'outil de mise à l'échelle automatique de l'opérateur peut contribuer à réduire la surcharge en recueillant des métriques des tâches Flink et en modifiant automatiquement le parallélisme au niveau du sommet de la tâche. Voici un exemple de ce à quoi pourrait ressembler votre configuration :

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Cette configuration utilise les valeurs par défaut de la dernière version d'Amazon EMR. Si vous utilisez d'autres versions, vous pouvez avoir des valeurs différentes.

**Note**  
Depuis Amazon EMR 7.2.0, il n'est pas nécessaire d'inclure le préfixe `kubernetes.operator` dans votre configuration. Si vous utilisez la version 7.1.0 ou une version antérieure, vous devez utiliser le préfixe avant chaque configuration. Par exemple, vous devez spécifier`kubernetes.operator.job.autoscaler.scaling.enabled`.

Les options de configuration de l'outil de mise à l'échelle automatique sont les suivantes.
+ `job.autoscaler.scaling.enabled`— indique s'il faut activer l'exécution de la mise à l'échelle des sommets par l'autoscaler. La valeur par défaut est `true`. Si vous désactivez cette configuration, l'autoscaler collecte uniquement les métriques et évalue le parallélisme suggéré pour chaque sommet, mais ne met pas à niveau les tâches.
+ `job.autoscaler.stabilization.interval` : la période de stabilisation au cours de laquelle aucune nouvelle mise à l'échelle ne sera exécutée. La valeur par défaut est de 5 minutes.
+ `job.autoscaler.metrics.window` : la taille de la fenêtre d'agrégation des métriques de mise à l'échelle. Une fenêtre de taille plus grande rend le processus plus fluide et stable, toutefois, cela peut ralentir la réactivité de l'outil de mise à l'échelle automatique face à des variations brusques de charge. La valeur par défaut est de 15 minutes. Nous vous recommandons d'expérimenter en utilisant une valeur comprise entre 3 et 60 minutes.
+ `job.autoscaler.target.utilization` : l'utilisation du sommet cible pour assurer des performances stables de la tâche et une marge pour les fluctuations de charge. Le `0.7` ciblage par défaut est de 70 % utilization/load pour les sommets des tâches.
+ `job.autoscaler.target.utilization.boundary` : la limite d'utilisation du sommet cible qui sert de tampon supplémentaire pour éviter une mise à l'échelle immédiate en cas de fluctuations de charge. La valeur par défaut est`0.3`, ce qui signifie qu'un écart de 30 % par rapport à l'utilisation cible est autorisé avant de déclencher une action de dimensionnement.
+ `ob.autoscaler.restart.time` : l'heure prévue pour redémarrer l'application. La valeur par défaut est de 5 minutes.
+ `job.autoscaler.catch-up.duration` : le temps estimé pour rattraper le retard, c'est-à-dire pour traiter entièrement tout retard accumulé après l'achèvement d'une opération de mise à l'échelle. La valeur par défaut est de 5 minutes. En réduisant la durée de rattrapage, l'outil de mise à l'échelle automatique doit réserver une plus grande capacité supplémentaire pour les actions de mise à l'échelle.
+ `pipeline.max-parallelism` : le parallélisme maximal que l'outil de mise à l'échelle automatique peut utiliser. L'outil de mise à l'échelle automatique ignore cette limite si elle est supérieure au parallélisme maximal configuré dans la configuration Flink ou directement sur chaque opérateur. La valeur par défaut est -1. Notez que l'outil de mise à l'échelle automatique calcule le parallélisme comme un diviseur du parallélisme maximum. Il est donc recommandé de sélectionner des paramètres de parallélisme maximum qui offrent une large gamme de diviseurs potentiels, plutôt que de se baser uniquement sur les valeurs par défaut proposées par Flink. Nous recommandons d'utiliser des multiples de 60 pour cette configuration, tels que 120, 180, 240, 360, 720, etc.

Pour une page de référence plus détaillée sur la configuration, consultez la rubrique [Configuration de l'outil de mise à l'échelle automatique](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Réglage automatique des paramètres de l'Autoscaler
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

Cette section décrit le comportement de réglage automatique pour les différentes versions d'Amazon EMR. Il décrit également en détail les différentes configurations d'auto-scaling.

**Note**  
Amazon EMR 7.2.0 et versions ultérieures utilisent la configuration open source `job.autoscaler.restart.time-tracking.enabled` pour permettre l'estimation du temps de **redimensionnement**. L'estimation du temps de redimensionnement possède les mêmes fonctionnalités que le réglage automatique d'Amazon EMR. Vous n'avez donc pas à attribuer manuellement des valeurs empiriques à l'heure de redémarrage.  
Vous pouvez toujours utiliser le réglage automatique d'Amazon EMR si vous utilisez Amazon EMR 7.1.0 ou une version antérieure.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 et versions ultérieures mesurent le temps de redémarrage réel requis pour appliquer les décisions de dimensionnement automatique. Dans les versions 7.1.0 et antérieures, vous deviez utiliser la configuration `job.autoscaler.restart.time` pour configurer manuellement le temps de redémarrage maximal estimé. En utilisant la configuration`job.autoscaler.restart.time-tracking.enabled`, il vous suffit de saisir une heure de redémarrage pour la première mise à l'échelle. Ensuite, l'opérateur enregistre l'heure de redémarrage réelle et l'utilisera pour les redimensionnements ultérieurs.

Pour activer ce suivi, utilisez la commande suivante :

```
job.autoscaler.restart.time-tracking.enabled: true
```

Les configurations associées pour l'estimation du temps de redimensionnement sont les suivantes.


| Configuration | Obligatoire | Par défaut | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | Non | False | Indique si le Flink Autoscaler doit ajuster automatiquement les configurations au fil du temps afin d'optimiser les décisions de dimensionnement. Notez que l'Autoscaler peut uniquement régler automatiquement le paramètre Autoscaler. restart.time | 
| job.autoscaler.restart.time | Non | 5 min | Temps de redémarrage prévu utilisé par Amazon EMR on EKS jusqu'à ce que l'opérateur puisse déterminer le temps de redémarrage réel à partir des mises à l'échelle précédentes. | 
| job.autoscaler.restart.time-tracking.limit | Non | 15 min | Durée de redémarrage maximale observée lorsque le job.autoscaler.restart.time-tracking.enabled paramètre est réglé surtrue. | 

Voici un exemple de spécification de déploiement que vous pouvez utiliser pour essayer d'estimer le temps de redimensionnement :

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Pour simuler la contre-pression, utilisez les spécifications de déploiement suivantes.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Téléchargez le script Python suivant dans votre compartiment S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Pour vérifier que l'estimation du temps de redimensionnement fonctionne, assurez-vous que l'enregistrement des `DEBUG` niveaux par l'opérateur Flink est activé. L'exemple ci-dessous montre comment mettre à jour le fichier de diagramme de barre`values.yaml`. Réinstallez ensuite le tableau de bord mis à jour et réexécutez votre tâche Flink.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Obtenez le nom de votre module leader.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Exécutez la commande suivante pour obtenir le temps de redémarrage réel utilisé dans les évaluations des métriques.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Vous devriez voir des journaux similaires aux suivants. Notez que seule la première mise à l'échelle l'utilise` job.autoscaler.restart.time`. Les mises à l'échelle suivantes utilisent le temps de redémarrage observé.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

Le logiciel open source intégré Flink Autoscaler utilise de nombreux indicateurs pour prendre les meilleures décisions en matière de dimensionnement. Cependant, les valeurs par défaut qu'il utilise pour ses calculs sont censées s'appliquer à la plupart des charges de travail et peuvent ne pas être optimales pour une tâche donnée. La fonction de réglage automatique ajoutée à la version Amazon EMR on EKS du Flink Operator examine les tendances historiques observées sur des métriques capturées spécifiques, puis tente de calculer la valeur la plus optimale adaptée à la tâche donnée.


| Configuration | Obligatoire | Par défaut | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | Non | False | Indique si le Flink Autoscaler doit ajuster automatiquement les configurations au fil du temps afin d'optimiser les décisions des autoscalers. Actuellement, l'Autoscaler peut uniquement régler automatiquement le paramètre Autoscaler. restart.time | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | Non | 3 | Indique le nombre de métriques historiques Amazon EMR sur EKS que l'Autoscaler conserve dans la carte de configuration des métriques Amazon EMR on EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | Non | 3 | Indique le nombre de redémarrages effectués par Autoscaler avant de commencer à calculer le temps de redémarrage moyen pour une tâche donnée. | 

Pour activer le réglage automatique, vous devez avoir effectué les opérations suivantes :
+ Définissez `kubernetes.operator.job.autoscaler.autotune.enable:` sur `true`
+ Définissez `metrics.job.status.enable:` sur `TOTAL_TIME`
+ A suivi la configuration de l'[utilisation d'Autoscaler pour les applications Flink pour activer la](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) mise à l'échelle automatique.

Voici un exemple de spécification de déploiement que vous pouvez utiliser pour essayer le réglage automatique.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Pour simuler la contre-pression, utilisez les spécifications de déploiement suivantes.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Téléchargez le script Python suivant dans votre compartiment S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Pour vérifier que votre autotuner fonctionne, utilisez les commandes suivantes. Notez que vous devez utiliser les informations de votre propre module leader pour le Flink Operator.

Obtenez d'abord le nom de votre module leader.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Une fois que vous avez le nom de votre module leader, vous pouvez exécuter la commande suivante.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Vous devriez voir des journaux similaires aux suivants.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Maintenance et résolution des problèmes liés aux tâches Flink sur Amazon EMR sur EKS
<a name="jobruns-flink-troubleshooting"></a>

Les sections suivantes expliquent comment gérer vos tâches Flink de longue durée et fournissent des conseils sur la manière de résoudre certains problèmes courants liés aux tâches Flink.

# Maintenance des applications Flink
<a name="jobruns-flink-maintain"></a>

**Topics**
+ [Modes de mise à niveau](#jobruns-flink-upgrademode)

Les applications Flink sont généralement conçues pour fonctionner pendant de longues périodes (plusieurs semaines, mois, voire plusieurs années). Comme pour tous les services de longue durée, les applications de streaming Flink doivent faire l’objet d’une maintenance. Cela inclut des corrections de bogues, des améliorations et la migration vers un cluster Flink exécutant une version plus récente.

L’évolution des spécifications pour les ressources `FlinkDeployment` et `FlinkSessionJob` implique de mettre à niveau l’application en cours d’exécution. Pour ce faire, l’opérateur arrête la tâche en cours d’exécution (à moins qu’elle soit déjà suspendue) et la redéploie avec les dernières spécifications et, pour les applications avec état, l’état de l’exécution précédente.

Les utilisateurs choisissent comment gérer l’état lorsque les applications avec état s’arrêtent et sont restaurées avec le paramètre `upgradeMode` défini sur `JobSpec`.

## Modes de mise à niveau
<a name="jobruns-flink-upgrademode"></a>

Introduction optionnelle

**Applications sans état**  
Les applications sans état sont mises à niveau à partir de l’état vide.

**Dernier état**  
Les mises à niveau rapides, quel que soit l’état de l’application (même pour les tâches ayant échoué), ne nécessitent pas une tâche saine, car elles utilisent toujours le dernier point de contrôle réussi. Une récupération manuelle peut être nécessaire en cas de perte de métadonnées de haute disponibilité. Pour limiter l’ancienneté du dernier point de contrôle utilisé pour la récupération, configurez le paramètre `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`. Si le point de contrôle est plus ancien que la valeur configurée, un point de sauvegarde sera utilisé à la place pour les tâches saines. Cette fonctionnalité n’est pas prise en charge en mode session. 

**Points de sauvegarde**  
Utilisez le point de sauvegarde pour la mise à niveau, offrant ainsi une sécurité maximale et la possibilité de servir de backup/fork point. Le point de sauvegarde sera créé lors de la mise à niveau. Notez que la tâche Flink doit être en cours d’exécution pour que le point de sauvegarde puisse être créé. Si la tâche n'est pas fonctionnelle, le dernier point de contrôle sera utilisé (sauf kubernetes.operator.job.upgrade). last-state-fallback.enabled est défini sur false). Si le dernier point de contrôle n’est pas disponible, la mise à niveau de la tâche échouera.

# Résolution des problèmes
<a name="jobruns-flink-troubleshoot"></a>

Cette section explique comment résoudre les problèmes liés à Amazon EMR on EKS. Pour plus d'informations sur la manière de résoudre les problèmes généraux liés à Amazon EMR, consultez la rubrique [Résolution des problèmes liés à un cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html) dans le *Guide de gestion d'Amazon EMR*.
+ [Résolution des problèmes liés à l'utilisation de PersistentVolumeClaims (PVC)](permissions-for-pvc.md)
+ [Résolution des problèmes de mise à l'échelle automatique verticale d'Amazon EMR on EKS](troubleshooting-vas.md)
+ [Résolution des problèmes liés à l'opérateur Spark d'Amazon EMR on EKS](troubleshooting-sparkop.md)

## Résoudre les problèmes liés à Apache Flink sur Amazon EMR sur EKS
<a name="jobruns-flink-troubleshooting-apache-flink"></a>

### Le mappage des ressources est introuvable lors de l'installation des Charts de Helm
<a name="w2aac21c21b7b7b3"></a>

Le message d'erreur suivant peut s'afficher lorsque vous installez les Charts de Helm.

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

Pour résoudre cette erreur, installez cert-manager afin de pouvoir intégrer le composant webhook. Vous devez installer cert-manager sur chaque cluster Amazon EKS que vous utilisez.

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### Service AWS erreur d'accès refusé
<a name="jobruns-flink-troubleshooting-access-denied"></a>

Si un message d'erreur *access denied* s'affiche, vérifiez que le rôle IAM pour `operatorExecutionRoleArn` dans le fichier `values.yaml` de Charts de Helm dispose des autorisations appropriées. Vérifiez également que le rôle IAM sous `executionRoleArn` dans votre spécification `FlinkDeployment` dispose des autorisations appropriées.

### `FlinkDeployment` est bloqué
<a name="jobruns-flink-troubleshooting-stuck"></a>

Si votre `FlinkDeployment` est bloqué à l'état arrêté, suivez les étapes suivantes pour forcer la suppression du déploiement :

1. Modifiez l'exécution du déploiement.

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. Supprimez ce finalisateur.

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. Supprimez le déploiement.

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### AWSBadRequestException problème s3a lors de l'exécution d'une application Flink dans un opt-in Région AWS
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Si vous exécutez une application Flink dans le cadre d'un [opt-in Région AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), les erreurs suivantes peuvent s'afficher :

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

Pour corriger ces erreurs, utilisez la configuration suivante dans votre fichier de `FlinkDeployment` définition.

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

Nous vous recommandons également d'utiliser le fournisseur SDKv2 d'informations d'identification :

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

Si vous souhaitez utiliser le fournisseur SDKv1 d'informations d'identification, assurez-vous que votre SDK prend en charge votre région d'adhésion. Pour plus d'informations, consultez le [aws-sdk-java GitHub référentiel](https://github.com/aws/aws-sdk-java).

`S3 AWSBadRequestException`Si vous exécutez des instructions SQL Flink dans une région optionnelle, assurez-vous de définir la configuration `fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME` dans vos spécifications de configuration Flink.

### S3A AWSBad RequestException lors de l'exécution d'une tâche de session Flink dans les régions CN
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Pour les versions 6.15.0 à 7.2.0 d'Amazon EMR, les messages d'erreur suivants peuvent s'afficher lorsque vous exécutez une tâche de session Flink dans les régions CN. Il s'agit notamment de la Chine (Pékin) et de la Chine (Ningxia) :

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

Il existe une prise de conscience de ce problème. L'équipe travaille à la correction des opérateurs Flink pour toutes ces versions. Toutefois, avant de terminer le correctif, pour corriger cette erreur, vous devez télécharger le diagramme de barre de l'opérateur Flink, le décompresser (extraire le fichier compressé) et apporter des modifications de configuration dans le graphique de barre.

Les étapes spécifiques sont les suivantes :

1. Accédez, en particulier, aux répertoires dans votre dossier local pour le graphique de barre, et exécutez la ligne de commande suivante pour extraire le graphique de barre et le décompresser (extraire).

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. Accédez au dossier Helm Chart et trouvez le `templates/flink-operator.yaml` fichier.

1. Recherchez `flink-operator-config` ConfigMap et ajoutez la `fs.s3a.endpoint.region` configuration suivante dans le`flink-conf.yaml`. Par exemple :

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. Installez le tableau de bord local et exécutez votre tâche.

# Versions prises en charge pour Amazon EMR sur EKS avec Apache Flink
<a name="jobruns-flink-security-release-versions"></a>

Apache Flink est disponible avec les versions suivantes d'Amazon EMR sur EKS. Pour plus d'informations sur toutes les versions disponibles, consultez [Versions Amazon EMR on EKS](emr-eks-releases.md).


| Étiquette de version | Java | Flink | Opérateur Flink | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1,17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1,17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 