

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Arbeiten mit Flink-Jobs in Amazon EMR
<a name="flink-jobs"></a>

Es gibt mehrere Möglichkeiten, mit Flink in Amazon EMR zu interagieren: über die Konsole, die Flink-Oberfläche auf der ResourceManager Tracking-Benutzeroberfläche und über die Befehlszeile. Mit jedem dieser Programme können Sie eine JAR-Datei an eine Flink-Anwendung senden. Sobald Sie eine JAR-Datei eingereicht haben, wird sie zu einem Job, der vom Flink verwaltet wird. JobManager Der JobManager befindet sich auf dem YARN-Knoten, der den Application Master-Daemon für die Flink-Sitzung hostet.

Sie können eine Flink-Anwendung als YARN-Auftrag auf einem Cluster mit langer Laufzeit oder auf einem vorübergehenden Cluster ausführen. In einem Cluster mit langer Laufzeit können Sie mehrere Flink-Aufträge an einen Flink-Cluster senden, der auf Amazon EMR ausgeführt wird. Wenn Sie einen Flink-Auftrag auf einem vorübergehenden Cluster ausführen, existiert Ihr Amazon-EMR-Cluster nur für die Zeit, die zum Ausführen der Flink-Anwendung benötigt wird, sodass Ihnen nur die verbrauchten Ressourcen und die Zeit in Rechnung gestellt werden. Sie können einen Flink-Job mit der Amazon `AddSteps` EMR-API-Operation als Schrittargument für die `RunJobFlow` Operation und über die Befehle AWS CLI `add-steps` oder `create-cluster` einreichen.

## Eine Flink-YARN-Anwendung als Schritt auf einem Cluster mit langer Laufzeit starten
<a name="flink-add-step"></a>

Um eine Flink-Anwendung zu starten, an die mehrere Clients über YARN-API-Operationen Arbeit einreichen können, müssen Sie entweder einen Cluster erstellen oder eine Flink-Anwendung zu einem vorhandenen Cluster hinzufügen. Eine Anleitung zur Erstellung eines neuen Clusters finden Sie unter [Erstellen eines Clusters mit Flink](flink-create-cluster.md). Um eine YARN-Sitzung auf einem vorhandenen Cluster zu starten, führen Sie die folgenden Schritte über die Konsole, AWS CLI oder das Java-SDK aus.

**Anmerkung**  
Der Befehl `flink-yarn-session` wurde in Amazon-EMR-Version 5.5.0 als Wrapper für das Skript `yarn-session.sh` zur Vereinfachung der Ausführung hinzugefügt. Wenn Sie eine frühere Version von Amazon EMR verwenden, ersetzen Sie `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` mit **Argumente** in der Konsole oder `Args` im AWS CLI -Befehl.

**So senden Sie einen Flink-Auftrag auf einem vorhandenen Cluster von der Konsole aus**

Senden Sie die Flink-Sitzung mit dem Befehl `flink-yarn-session` in einem vorhandenen Cluster.

1. Öffnen Sie die Amazon EMR-Konsole unter [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr/).

1. Wählen Sie in der Cluster-Liste den Cluster aus, den Sie zuvor gestartet haben.

1. Wählen Sie auf der Cluster-Detailseite **Steps (Schritte)** und **Add Step (Schritt hinzufügen)** aus.

1. Befolgen Sie die folgenden Richtlinien, um die Parameter einzugeben, und wählen Sie dann **Hinzufügen** aus.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/flink-jobs.html)

**Um einen Flink-Job auf einem vorhandenen Cluster einzureichen mit AWS CLI**
+ Verwenden Sie den `add-steps`-Befehl, um einem Cluster mit langer Laufzeit einen Flink-Auftrag hinzuzufügen. Der folgende Beispielbefehl gibt `Args="flink-yarn-session", "-d"` an, sodass eine Flink-Sitzung innerhalb Ihres YARN-Clusters in einem getrennten Zustand (`-d`) gestartet werden soll. Weitere Informationen finden Sie unter [YARN-Einrichtung](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session) in der aktuellen Flink-Dokumentation für Argumentdetails.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Senden Sie Ihre Arbeit an eine bestehende Flink-Anwendung auf einem Cluster mit langer Laufzeit
<a name="flink-submit-work"></a>

Wenn Sie bereits eine bestehende Flink-Anwendung auf einem Cluster mit langer Laufzeit haben, können Sie die Flink-Anwendungs-ID des Clusters angeben, um Arbeit an diesen zu senden. Um die Anwendungs-ID zu erhalten, führen Sie `yarn application -list` den Vorgang AWS CLI oder über den [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API-Vorgang aus:

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

Die Anwendungs-ID für diese Flink-Sitzung lautet`application_1473169569237_0002`, mit der Sie Arbeiten aus dem AWS CLI oder einem SDK an die Anwendung einreichen können.

**Example SDK für Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## Senden eines kurzlebigen Flink-Auftrags
<a name="flink-transient-job"></a>

Die folgenden Beispiele starten einen vorübergehenden Cluster, der einen Flink-Auftrag ausführt und dann nach Abschluss beendet wird.

**Example SDK für Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Verwenden Sie den Unterbefehl `create-cluster`, um einen kurzlebigen Cluster zu erstellen, der beendet wird, wenn der Flink-Auftrag abgeschlossen ist:  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```