

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exécution d'une application Flink
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Avec Amazon EMR 6.13.0 et versions ultérieures, vous pouvez exécuter une application Flink avec l’opérateur Kubernetes pour Flink en mode application sur Amazon EMR sur EKS. Avec Amazon EMR 6.15.0 et versions ultérieures, vous pouvez également exécuter une application Flink en mode session. Cette section présente plusieurs manières d’exécuter une application Flink avec Amazon EMR sur EKS.

**Topics**

**Note**  
Il est nécessaire de disposer d'un compartiment Amazon S3 préalablement créé pour conserver les métadonnées de haute disponibilité lorsque vous soumettez votre tâche Flink. Si vous ne souhaitez pas utiliser cette fonctionnalité, vous pouvez la désactiver. Elle est activée par défaut.

**Prérequis** : pour pouvoir exécuter une application Flink avec l’opérateur Kubernetes pour Flink, procédez comme suit dans [Configuration de l'opérateur Kubernetes pour Flink sur Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md) et [Installez l'opérateur Kubernetes](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator).

------
#### [ Application mode ]

Avec Amazon EMR 6.13.0 et versions ultérieures, vous pouvez exécuter une application Flink avec l’opérateur Kubernetes pour Flink en mode application sur Amazon EMR sur EKS.

1. Créez un fichier de `FlinkDeployment` définition `basic-example-app-cluster.yaml` comme dans l'exemple suivant. Si vous avez activé et utilisé l'un des [opt-in Régions AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), assurez-vous de décommenter et de configurer la configuration. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Soumettez le déploiement Flink à l'aide de la commande ci-dessous. Cela créera également un objet `FlinkDeployment` nommé `basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Accédez à l'interface utilisateur de Flink.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Ouvrez `localhost:8081` pour consulter vos tâches Flink localement.

1. Nettoyez la tâche. N'oubliez pas de nettoyer les artefacts S3 créés pour cette tâche, tels que le pointage de contrôle, la haute disponibilité, les métadonnées de pointage de sauvegarde et les journaux. CloudWatch

Pour plus d'informations sur la soumission de candidatures à Flink via l'opérateur Flink Kubernetes, consultez les exemples d'opérateurs [Flink Kubernetes](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) dans le dossier sur. `apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

Avec Amazon EMR 6.15.0 et versions ultérieures, vous pouvez exécuter une application Flink avec l’opérateur Kubernetes pour Flink en mode session sur Amazon EMR sur EKS.

1. Créez un fichier de `FlinkDeployment` définition nommé `basic-example-app-cluster.yaml` comme dans l'exemple suivant. Si vous avez activé et utilisé l'un des [opt-in Régions AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), assurez-vous de décommenter et de configurer la configuration. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Soumettez le déploiement Flink à l'aide de la commande ci-dessous. Cela créera également un objet `FlinkDeployment` nommé `basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Utilisez la commande suivante pour vérifier que le cluster de session `LIFECYCLE` est défini sur `STABLE` :

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Voici un exemple de sortie :

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Créez un fichier de définition de ressources personnalisé `FlinkSessionJob` `basic-session-job.yaml` avec l’exemple de contenu suivant :

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Soumettez la tâche de session avec la commande ci-dessous. Cela créera également un objet `FlinkSessionJob` `basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Utilisez la commande suivante pour vérifier que le cluster de session `LIFECYCLE` est défini sur `STABLE`, et que `JOB STATUS` indique `RUNNING` :

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Voici un exemple de sortie :

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Accédez à l'interface utilisateur de Flink.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Ouvrez `localhost:8081` pour consulter vos tâches Flink localement.

1. Nettoyez la tâche. N'oubliez pas de nettoyer les artefacts S3 créés pour cette tâche, tels que le pointage de contrôle, la haute disponibilité, les métadonnées de pointage de sauvegarde et les journaux. CloudWatch

------