

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisation des tâches Flink dans Amazon EMR
<a name="flink-jobs"></a>

Il existe plusieurs manières d'interagir avec Flink sur Amazon EMR : via la console, via l'interface Flink située dans l'interface utilisateur de suivi et via ResourceManager la ligne de commande. Vous pouvez envoyer un fichier JAR à une application Flink avec n'importe lequel de ces outils. Une fois soumis un fichier JAR, il devient une tâche gérée par le Flink JobManager. JobManager Il se trouve sur le nœud YARN qui héberge le daemon Application Master de la session Flink.

Vous pouvez exécuter une application Flink en tant que tâche YARN sur un cluster à exécution longue ou sur un cluster transitoire. Sur un cluster à exécution longue, vous pouvez soumettre plusieurs tâches Flink à un cluster Flink s'exécutant sur Amazon EMR. Si vous exécutez une tâche Flink sur un cluster transitoire, votre cluster Amazon EMR n'existe que le temps de l'exécution de l'application Flink, et vous n'êtes donc facturé que pour les ressources et le temps utilisés. Vous pouvez soumettre une tâche Flink avec l'opération d'API Amazon `AddSteps` EMR, en tant qu'argument d'étape de `RunJobFlow` l'opération et via AWS CLI `add-steps` les `create-cluster` commandes or.

## Démarrez une application Flink YARN en tant qu'étape sur un cluster de longue durée
<a name="flink-add-step"></a>

Pour démarrer une application Flink à laquelle plusieurs clients peuvent soumettre du travail via les opérations de l'API YARN, vous devez soit créer un cluster, soit ajouter une application Flink à un cluster existant. Pour obtenir des instructions sur la création d'un cluster, veuillez consulter [Création d'un cluster avec Flink](flink-create-cluster.md). Pour démarrer une session YARN sur un cluster existant, procédez comme suit à partir de la console, de l' AWS CLI ou du kit SDK Java.

**Note**  
La commande `flink-yarn-session` a été ajoutée dans la version 5.5.0 d'Amazon EMR comme habillage pour le script `yarn-session.sh`, afin de simplifier l'exécution. Si vous utilisez une version antérieure d'Amazon EMR, remplacez `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` par **Arguments** dans la console ou par `Args`. dans la commande AWS CLI .

**Pour soumettre une tâche Flink sur un cluster existant depuis la console**

Soumettez la session Flink avec la commande `flink-yarn-session` dans un cluster existant.

1. [Ouvrez la console Amazon EMR à l'adresse /emr. https://console.aws.amazon.com](https://console.aws.amazon.com/emr/)

1. Dans la liste des clusters, sélectionnez celui que vous avez lancé précédemment.

1. Dans la page des détails de cluster, sélectionnez **Steps (Étapes)**, **Add Step (Ajouter une étape)**.

1. Utilisez les directives suivantes pour saisir les paramètres, puis sélectionnez **Ajouter**.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/emr/latest/ReleaseGuide/flink-jobs.html)

**Pour soumettre une tâche Flink sur un cluster existant à l'aide du AWS CLI**
+ Utilisez la commande `add-steps` pour ajouter une tâche Flink à un cluster de longue durée. L'exemple de commande suivant indique `Args="flink-yarn-session", "-d"` pour démarrer une session Flink au sein de votre cluster YARN dans un état détaché (`-d`). Consultez [Configuration de YARN](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session) dans la dernière documentation Flink pour plus de détails sur les arguments.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Soumettre du travail à une application Flink existante sur un cluster de longue durée
<a name="flink-submit-work"></a>

Si vous possédez déjà une application Flink sur un cluster de longue durée, vous pouvez spécifier l'ID de l'application Flink du cluster afin d'y soumettre du travail. Pour obtenir l'ID de l'application, `yarn application -list` exécutez l'opération [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API AWS CLI ou via l'API :

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

L'identifiant de l'application pour cette session Flink est`application_1473169569237_0002`, que vous pouvez utiliser pour soumettre du travail à l'application à partir du SDK AWS CLI ou d'un SDK.

**Example SDK pour Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## Soumission d'une tâche Flink transitoire
<a name="flink-transient-job"></a>

Les exemples suivants lancent un cluster transitoire qui exécute une tâche Flink, puis la résilie à son terme.

**Example SDK pour Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Utilisez la sous-commande `create-cluster` pour créer un cluster transitoire qui se termine quand la tâche Flink est terminée :  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```