

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Bekerja dengan tugas Flink di Amazon EMR
<a name="flink-jobs"></a>

Ada beberapa cara untuk berinteraksi dengan Flink di Amazon EMR: melalui konsol, antarmuka Flink yang ditemukan di ResourceManager UI Pelacakan, dan di baris perintah. Anda dapat mengirimkan file JAR ke aplikasi Flink dengan salah satu dari ini. Setelah mengirimkan file JAR, itu menjadi pekerjaan yang dikelola oleh Flink JobManager. JobManager Ini terletak di node YARN yang menghosting daemon Master Aplikasi sesi Flink.

Anda dapat menjalankan aplikasi Flink sebagai tugas YARN pada klaster yang berjalan lama atau pada klaster sementara. Pada klaster yang berjalan lama, Anda dapat mengirimkan beberapa tugas Flink untuk satu klaster Flink yang berjalan di Amazon EMR. Jika Anda menjalankan tugas Flink di klaster sementara, klaster Amazon EMR Anda hanya ada untuk waktu yang diperlukan saat menjalankan aplikasi Flink, sehingga Anda hanya dikenakan biaya untuk sumber daya dan waktu yang digunakan. Anda dapat mengirimkan pekerjaan Flink dengan operasi Amazon `AddSteps` EMR API, sebagai argumen langkah untuk `RunJobFlow` operasi, dan melalui AWS CLI `add-steps` perintah atau. `create-cluster`

## Mulai aplikasi Flink YARN sebagai langkah pada klaster berjalan lama
<a name="flink-add-step"></a>

Untuk memulai aplikasi Flink yang beberapa klien dapat mengirimkan pekerjaan melalui operasi YARN API, Anda perlu membuat klaster atau menambahkan aplikasi Flink klaster yang ada. Untuk petunjuk tentang cara membuat klaster baru, lihat [Membuat klaster dengan Flink](flink-create-cluster.md). Untuk memulai sesi YARN pada klaster yang ada, gunakan langkah-langkah berikut dari konsol, AWS CLI, atau Java SDK.

**catatan**  
Parameter `flink-yarn-session` perintah ditambahkan di Amazon EMR versi 5.5.0 sebagai pembungkus untuk `yarn-session.sh` skrip dalam menyederhanakan eksekusi. Jika Anda menggunakan Amazon EMR versi sebelumnya, ganti `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` untuk **Pendapat** di konsol tersebut atau `Args`. di AWS CLI perintah.

**Untuk mengirimkan pekerjaan Flink pada cluster yang ada dari konsol**

Kirim sesi Flink dengan `flink-yarn-session` perintah di cluster yang ada.

1. [Buka konsol EMR Amazon di https://console.aws.amazon.com /emr.](https://console.aws.amazon.com/emr/)

1. Dalam daftar klaster, pilih klaster Anda yang sebelumnya diluncurkan.

1. Di halaman rincian klaster, pilih **Langkah**, **Tambahkan Langkah**.

1. Gunakan pedoman yang mengikuti untuk memasukkan parameter, lalu pilih **Tambah**.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/emr/latest/ReleaseGuide/flink-jobs.html)

**Untuk mengirimkan pekerjaan Flink pada cluster yang ada dengan AWS CLI**
+ Gunakan `add-steps` perintah untuk menambahkan pekerjaan Flink ke cluster yang berjalan lama. Contoh perintah berikut menentukan `Args="flink-yarn-session", "-d"` untuk memulai sesi Flink dalam cluster YARN Anda dalam status terpisah (). `-d` Lihat [Penyiapan YARN](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session) dalam dokumentasi Flink terbaru untuk rincian argumen.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Kirim tugas untuk aplikasi Flink yang ada pada klaster berjalan lama
<a name="flink-submit-work"></a>

Jika Anda sudah memiliki aplikasi Flink yang ada pada klaster yang berjalan lama, Anda dapat menentukan ID aplikasi Flink klaster untuk mengirimkan pekerjaan ke sana. Untuk mendapatkan ID aplikasi, jalankan `yarn application -list` pada AWS CLI atau melalui operasi [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API:

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

ID aplikasi untuk sesi Flink ini adalah`application_1473169569237_0002`, yang dapat Anda gunakan untuk mengirimkan pekerjaan ke aplikasi dari AWS CLI atau SDK.

**Example SDK for Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## Kirim tugas Flink sementara
<a name="flink-transient-job"></a>

Contoh berikut meluncurkan klaster sementara yang menjalankan tugas Flink dan kemudian berakhir pada penyelesaian.

**Example SDK for Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Gunakan `create-cluster` subperintah untuk membuat sebuah klaster sementara yang berakhir ketika tugas Flink selesai:  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```