

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Usar trabalhos do Flink no Amazon EMR
<a name="flink-jobs"></a>

Há várias maneiras de interagir com o Flink no Amazon EMR: por meio do console, da interface do Flink encontrada na interface de usuário de rastreamento e ResourceManager na linha de comando. Você pode enviar um arquivo JAR para uma aplicação Flink com qualquer uma destas opções. Depois de enviar um arquivo JAR, ele se torna um trabalho gerenciado pelo Flink JobManager. O JobManager está localizado no nó YARN que hospeda o daemon do Application Master da sessão Flink.

Você pode executar uma aplicação do Flink como um trabalho do YARN em um cluster de execução prolongada ou em um cluster transitório. Em um trabalho de execução prolongada, você pode enviar vários trabalhos do Flink para um cluster do Flink em execução no Amazon EMR. Se você executar um trabalho do Flink em um cluster transitório, seu cluster do Amazon EMR existirá somente pelo tempo necessário para executar a aplicação Flink; portanto, você pagará somente pelos recursos e pelo tempo usados. Você pode enviar um trabalho do Flink com a operação da API do Amazon `AddSteps` EMR, como um argumento de etapa para `RunJobFlow` a operação e por meio AWS CLI `add-steps` dos `create-cluster` comandos ou.

## Iniciar uma aplicação YARN do Flink como uma etapa em um cluster de execução prolongada
<a name="flink-add-step"></a>

Para iniciar uma aplicação Flink para a qual vários clientes possam enviar trabalhos por meio de operações de API do YARN, é necessário criar um cluster ou adicionar uma aplicação Flink a um cluster já existente. Para obter instruções sobre como criar um novo cluster, consulte [Criar um cluster com o Flink](flink-create-cluster.md). Para iniciar uma sessão do YARN em um cluster atual, use as etapas a seguir no console, na AWS CLI ou no SDK do Java.

**nota**  
O comando `flink-yarn-session` foi incluído no Amazon EMR versão 5.5.0 como um wrapper para o script `yarn-session.sh` para simplificar a execução. Se você usa uma versão anterior do Amazon EMR, substitua `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` por **Arguments** no console ou`Args` no comando AWS CLI .

**Enviar um trabalho do Flink que está em um cluster existente no console**

Envie a sessão do Flink usando o comando `flink-yarn-session` em um cluster já existente.

1. [Abra o console do Amazon EMR em https://console.aws.amazon.com /emr.](https://console.aws.amazon.com/emr/)

1. Na lista de clusters, selecione o cluster que você iniciou anteriormente.

1. Na página de detalhes do cluster, selecione **Steps (Etapas)**, **Add Step (Adicionar etapa)**.

1. Use as diretrizes a seguir para inserir os parâmetros e escolha **Adicionar**.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/emr/latest/ReleaseGuide/flink-jobs.html)

**Para enviar uma tarefa do Flink em um cluster existente com o AWS CLI**
+ Use o comando `add-steps` para adicionar um trabalho do Flink a um cluster de execução prolongada. O comando de exemplo a seguir especifica `Args="flink-yarn-session", "-d"` para iniciar uma sessão do Flink em seu cluster do YARN em um estado desconectado (`-d`). Consulte a [configuração do YARN](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session) na documentação do Flink mais recente para obter mais detalhes sobre argumentos.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Enviar o trabalho para uma aplicação Flink existente em um cluster de execução prolongada
<a name="flink-submit-work"></a>

Caso já tenha uma aplicação Flink em um cluster de execução prolongada, você poderá especificar o ID da aplicação Flink do cluster para enviar trabalhos para ele. Para obter o ID do aplicativo, execute `yarn application -list` na operação da [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API AWS CLI ou por meio dela:

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

O ID do aplicativo para esta sessão do Flink é`application_1473169569237_0002`, que você pode usar para enviar trabalhos para o aplicativo a partir do AWS CLI ou de um SDK.

**Example SDK para Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## Enviar um trabalho transitório do Flink
<a name="flink-transient-job"></a>

Os exemplos a seguir iniciam um cluster transitório que executa um trabalho do Flink e é terminado na conclusão.

**Example SDK para Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Use o subcomando `create-cluster` para criar um cluster transitório que termina quando o trabalho do Flink é concluído:  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```