

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Utilizzo dei processi Flink in Amazon EMR
<a name="flink-jobs"></a>

Esistono diversi modi per interagire con Flink su Amazon EMR: tramite la console, l'interfaccia Flink disponibile nell'interfaccia utente di tracciamento e ResourceManager la riga di comando. Puoi inviare un file JAR a un'applicazione Flink in uno di questi modi. Una volta inviato, un file JAR diventa un lavoro gestito da Flink. JobManager JobManager Si trova sul nodo YARN che ospita il demone Application Master della sessione Flink.

Puoi eseguire un'applicazione Flink come processo YARN su un cluster di lunga durata o su un cluster transitorio. Su un cluster di lunga durata, è possibile inviare più processi Flink a un cluster Flink in esecuzione su Amazon EMR. Se si esegue un processo Flink su un cluster transitorio, il cluster Amazon EMR esiste solo per il tempo necessario all'esecuzione dell'applicazione Flink, quindi verranno addebitati solo le risorse e il tempo utilizzati. Puoi inviare un job Flink con l'operazione API Amazon `AddSteps` EMR, come argomento del passaggio `RunJobFlow` dell'operazione e tramite AWS CLI `add-steps` i `create-cluster` comandi or.

## Avvio di un'applicazione YARN di Flink come fase su un cluster di lunga durata
<a name="flink-add-step"></a>

Per avviare un'applicazione Flink alla quale più client possono inviare lavoro tramite operazioni API YARN, è necessario creare un cluster o aggiungere a un'applicazione Flink un cluster esistente. Per istruzioni su come creare un nuovo cluster, consulta [Creazione di un cluster con Flink](flink-create-cluster.md). Per avviare una sessione YARN su un cluster esistente, segui le seguenti fasi dalla console, dalla AWS CLI o dall'SDK Java.

**Nota**  
Il comando `flink-yarn-session` è stato aggiunto alla versione 5.5.0 di Amazon EMR come wrapper per lo script `yarn-session.sh` per semplificare l'esecuzione. Se usi una versione precedente di Amazon EMR, sostituisci `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` per **Arguments (Argomenti)** nella console o `Args` nel comando AWS CLI .

**Per inviare un processo Flink su un cluster esistente dalla console**

Invia la sessione Flink usando il comando `flink-yarn-session` in un cluster esistente.

1. [Apri la console Amazon EMR in /emr. https://console.aws.amazon.com](https://console.aws.amazon.com/emr/)

1. Nell'elenco dei cluster, selezionare il cluster precedentemente avviato.

1. Nella pagina dei dettagli del cluster, scegliere **Steps (Fasi)**, **Add Step (Aggiungi fase)**.

1. Utilizza le linee guida che seguono per immettere i parametri, quindi scegli **Aggiungi**.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/emr/latest/ReleaseGuide/flink-jobs.html)

**Per inviare un job Flink su un cluster esistente con AWS CLI**
+ Utilizza il comando `add-steps` per avviare un processo Flink all'interno di un cluster di lunga durata. Il seguente comando di esempio specifica `Args="flink-yarn-session", "-d"` per avviare una sessione Flink all'interno del cluster YARN in uno stato scollegato (`-d`). Consulta [Impostazione YARN](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session) nella documentazione più aggiornata di Flink per i dettagli sugli argomenti.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Invio di lavoro a un'applicazione Flink esistente su un cluster di lunga durata
<a name="flink-submit-work"></a>

Se si dispone già di un'applicazione Flink esistente su un cluster di lunga durata, è possibile specificare l'ID applicazione Flink del cluster per inviare il lavoro. Per ottenere l'ID dell'applicazione, `yarn application -list` esegui l'operazione [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API AWS CLI o tramite:

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

L'ID dell'applicazione per questa sessione Flink è`application_1473169569237_0002`, che puoi utilizzare per inviare lavori all'applicazione da AWS CLI o da un SDK.

**Example SDK per Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## Invio di un processo transitorio Flink
<a name="flink-transient-job"></a>

I seguenti esempi avviano un cluster transitorio che esegue un processo Flink e poi termina al completamento.

**Example SDK per Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Utilizza il sottocomando `create-cluster` per creare un cluster transitorio che termina al termine del processo Flink:  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```