

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon EMR 内で Flink ジョブを操作する
<a name="flink-jobs"></a>

Amazon EMR 上でコンソールを通じて Flink とやりとりするにはいくつかの方法があります。ResourceManager の UI 追跡にある Flink インターフェイスと、コマンドライン上です。このいずれの場合にも、JAR ファイルを Flink アプリケーションに送信できます。送信した JAR ファイルは Flink JobManager によって管理されるジョブになります。JobManager は、Flink セッションのアプリケーションマスターデーモンをホストする YARN ノードで動作しています。

長期実行のクラスターまたは一時的なクラスターで、YARN ジョブとして Flink アプリケーションを実行できます。長期実行のジョブでは、複数の Flink ジョブを Amazon EMR で実行する 1 つの Flink クラスターに送信できます。一時的なクラスターで Flink ジョブを稼働する場合、Amazon EMR クラスターは Flink アプリケーションを実行するために必要な時間のためだけに存在します。そのため、使用したリソースと費やした時間に対してのみ課金されます。Amazon EMR `AddSteps` API オペレーション、オペレーションへのステップ引数、 または `create-cluster` コマンドを使用して `RunJobFlow` AWS CLI `add-steps` Flink ジョブを送信できます。

## 長期実行のクラスターのステップとして、Flink YARN アプリケーションを起動します
<a name="flink-add-step"></a>

複数のクライアントが YARN API オペレーションを介して作業を送信できる Flink アプリケーションを起動するには、クラスターを作成するか、既存のクラスターに Flink アプリケーションを追加する必要があります。新しいクラスターを作成する手順については、「[Flink を使用してクラスターを作成する](flink-create-cluster.md)」を参照してください。既存のクラスターで YARN セッションを開始するには、コンソール、 AWS CLI、Java SDK から次のステップに従います。

**注記**  
Amazon EMR バージョン 5.5.0 では、実行を簡素化するための `yarn-session.sh` スクリプトのラッパーとして、`flink-yarn-session` コマンドが追加されました。以前のバージョンの Amazon EMR を使用している場合は、コンソールの**引数**、または AWS CLI コマンドの `Args` を `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` で置き換えます。

**コンソールを使用して Flink ジョブを既存のクラスターに送信するには**

`flink-yarn-session` コマンドを使用して、Flink セッションを既存のクラスターに送信します。

1. [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr/) で Amazon EMR コンソールを開きます。

1. クラスターリストで、以前に起動したクラスターを選択します。

1. クラスターの詳細ページで、[**Steps (ステップ)**]、[**Add Step (ステップの追加)**] の順に選択します。

1. 次のガイドラインに従ってパラメータを入力し、**[追加]** を選択します。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/flink-jobs.html)

**を使用して既存のクラスターで Flink ジョブを送信するには AWS CLI**
+ Flink ジョブを長期実行クラスターに追加するには、`add-steps` コマンドを使用します。次のコマンド例では、YARN クラスター内において Flink セッションが切り離された状態 (`-d`) で開始されるよう、`Args="flink-yarn-session", "-d"` を指定しています。引数の詳細については、最新の Flink ドキュメントの「[YARN Setup (YARN の設定)](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session)」を参照してください。

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## 長期実行クラスター上の既存の Flink アプリケーションに作業を送信する
<a name="flink-submit-work"></a>

長期実行クラスターに既存の Flink アプリケーションがある場合は、作業を送信するために、クラスターの Flink アプリケーション ID を指定できます。アプリケーション ID を取得するには、 AWS CLI `yarn application -list`または [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html) API オペレーションを使用して を実行します。

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

この Flink セッションのアプリケーション ID は です。これを使用して`application_1473169569237_0002`、 AWS CLI または SDK からアプリケーションに作業を送信できます。

**Example SDK for Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## 一時的な Flink ジョブを送信する
<a name="flink-transient-job"></a>

次の例では Flink ジョブを実行する一時的なクラスターを起動し、完了時に終了します。

**Example SDK for Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Flink ジョブの完了時に終了する一時的なクラスターを作成するには、`create-cluster` サブコマンドを使用します。  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```