

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon EMR에서 Flink 작업 사용
<a name="flink-jobs"></a>

콘솔을 통해 Amazon EMR 기반 Flink와 상호 작용하는 몇 가지 방법이 있으며, Flink 인터페이스는 ResourceManager Tracking UI와 명령줄에 있습니다. 이들 중 하나를 사용하여 JAR 파일을 Flink 애플리케이션에 제출할 수 있습니다. JAR 파일을 제출하면 Flink JobManager에서 관리하는 작업이 됩니다. JobManager는 Flink 세션 애플리케이션 마스터 대몬(daemon)을 호스팅하는 YARN 노드에 있습니다.

또한 Flink 애플리케이션을 장기 실행 YARN 작업이나 임시 클러스터로 실행할 수 있습니다. 장기 실행 클러스터에서는 Amazon EMR 클러스터에서 실행 중인 Flink 클러스터 하나로 여러 Flink 애플리케이션을 제출할 수 있습니다. Flink 작업을 임시 클러스터에서 실행할 경우 Amazon EMR 클러스터는 Flink 애플리케이션을 실행하는 데 걸리는 시간 동안만 존재하므로 사용한 리소스와 시간에 대해서만 요금이 청구됩니다. Amazon EMR `AddSteps` API 작업, 작업에 대한 단계 인수, 또는 `create-cluster` 명령을 `RunJobFlow` 통해 Flink 작업을 제출할 수 있습니다 AWS CLI `add-steps`.

## 장기 실행 클러스터에서 한 단계로 Flink YARN 애플리케이션 시작
<a name="flink-add-step"></a>

여러 클라이언트가 YARN API 작업을 통해 작업을 제출할 수 있는 Flink 애플리케이션을 시작하려면 클러스터를 생성하거나 Flink 애플리케이션을 기존 클러스터에 추가해야 합니다. 새 클러스터를 생성하는 방법에 대한 지침은 [Flink를 포함하는 클러스터 생성](flink-create-cluster.md) 부분을 참조하세요. 기존 클러스터에서 YARN 세션을 시작하려면 콘솔, AWS CLI또는 Java SDK에서 다음 단계를 수행합니다.

**참고**  
`flink-yarn-session` 명령은 실행을 간소화하기 위한 `yarn-session.sh` 스크립트 래퍼로 Amazon EMR 버전 5.5.0에 추가되었습니다. 이전 버전의 Amazon EMR을 사용하는 경우 콘솔에서 **인수**에 대해 `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"`를 대체하거나 AWS CLI 명령에서 `Args`를 대체합니다.

**콘솔에서 기존 클러스터의 Flink 작업을 제출하는 방법**

기존 클러스터에서 `flink-yarn-session` 명령을 사용하여 Flink 세션을 제출합니다.

1. [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr/) Amazon EMR 콘솔을 엽니다.

1. 클러스터 목록에서 이전에 시작한 클러스터를 선택합니다.

1. 클러스터 세부 정보 페이지에서 **단계**, **Add Step(단계 추가)**을 선택합니다.

1. 아래 지침에 따라 파라미터를 입력하고 **추가**를 선택합니다.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/emr/latest/ReleaseGuide/flink-jobs.html)

**를 사용하여 기존 클러스터에서 Flink 작업을 제출하려면 AWS CLI**
+ `add-steps` 명령을 사용하여 장기 실행 클러스터에 Flink 작업을 추가합니다. 다음 예제 명령은 YARN 클러스터 내에서 분리된 상태(`-d`)로 Flink 세션을 시작하도록 `Args="flink-yarn-session", "-d"`를 지정합니다. 인수에 대한 자세한 내용은 최신 Flink 설명서에서 [YARN Setup](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session)을 참조하세요.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## 장기 실행 클러스터에서 기존 Flink 애플리케이션에 작업 제출
<a name="flink-submit-work"></a>

장기 실행 클러스터에 기존 Flink 애플리케이션이 이미 있는 경우 클러스터의 Flink 애플리케이션 ID를 지정하여 클러스터에 작업을 제출할 수 있습니다. 애플리케이션 ID를 얻으려면 `yarn application -list`에서 AWS CLI 또는 [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html) API 작업을 통해를 실행합니다.

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

이 Flink 세션의 애플리케이션 ID는 AWS CLI 또는 SDK에서 애플리케이션에 작업을 제출하는 데 사용할 수 `application_1473169569237_0002`있는 입니다.

**Example SDK for Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## 임시 Flink 작업 제출
<a name="flink-transient-job"></a>

다음 예제에서는 Flink 작업을 실행한 다음 완료 시 종료하는 임시 클러스터를 시작합니다.

**Example SDK for Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
`create-cluster` 하위 명령을 사용하여 Flink 작업 완료 시 종료되는 임시 클러스터를 생성합니다.  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```