

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Flink 애플리케이션 실행
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Amazon EMR 6.13.0 이상을 사용할 경우 Amazon EMR on EKS의 애플리케이션 모드에서 Flink Kubernetes 연산자로 Flink 애플리케이션을 실행할 수 있습니다. Amazon EMR 6.15.0 이상을 사용할 경우 세션 모드에서 Flink 애플리케이션을 실행할 수도 있습니다. 이 페이지는 Amazon EMR on EKS로 Flink 애플리케이션을 실행하기 위해 활용할 수 있는 두 가지 방법을 모두 설명합니다.

**Topics**

**참고**  
Flink 작업을 제출할 때 고가용성 메타데이터를 저장할 Amazon S3 버킷을 생성해야 합니다. 이 기능을 사용하고 싶지 않은 경우 비활성화할 수 있습니다. 기본적으로 활성화됩니다.

**전제 조건** – Flink Kubernetes 운영자로 Flink 애플리케이션을 실행하기 전에 [Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 설정](jobruns-flink-kubernetes-operator-setup.md) 및 [Kubernetes 연산자 설치](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator)의 단계를 완료합니다.

------
#### [ Application mode ]

Amazon EMR 6.13.0 이상을 사용할 경우 Amazon EMR on EKS의 애플리케이션 모드에서 Flink Kubernetes 연산자로 Flink 애플리케이션을 실행할 수 있습니다.

1. 다음 예제에서 `FlinkDeployment` 정의 파일 `basic-example-app-cluster.yaml`을 생성합니다. [옵트인 AWS 리전](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html) 중 하나를 활성화하고 사용하는 경우 구성의 주석 처리를 해제하고 구성해야 합니다`fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 다음 명령으로 Flink 배포를 제출합니다. 이렇게 하면 `FlinkDeployment` 객체(`basic-example-app-cluster`)도 생성됩니다.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Flink UI에 액세스합니다.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. `localhost:8081`을 열어서 Flink 작업을 로컬에서 확인합니다.

1. 작업을 정리합니다. 이 작업을 위해 생성된 S3 아티팩트(예: 검사, 고가용성, 메타데이터 세이브포인트 및 CloudWatch 로그)를 정리해야 합니다.

Flink Kubernetes 운영자를 통해 Flink에 애플리케이션을 제출하는 방법에 대한 자세한 내용은 GitHub의 `apache/flink-kubernetes-operator` 폴더에 있는 [Flink Kubernetes operator examples](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)를 참조하세요.

------
#### [ Session mode ]

Amazon EMR 6.15.0 이상을 사용할 경우 Amazon EMR on EKS의 세션 모드에서 Flink Kubernetes 연산자로 Flink 애플리케이션을 실행할 수 있습니다.

1. 다음 예제에서 이름이 `basic-example-app-cluster.yaml`인 `FlinkDeployment` 정의 파일을 생성합니다. [옵트인 AWS 리전](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html) 중 하나를 활성화하고 사용하는 경우 구성의 주석 처리를 해제하고 구성해야 합니다`fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 다음 명령으로 Flink 배포를 제출합니다. 이렇게 하면 `FlinkDeployment` 객체(`basic-example-session-cluster`)도 생성됩니다.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 다음과 같은 명령을 사용하여 세션 클러스터 `LIFECYCLE`이 `STABLE`인지 확인하세요.

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   출력은 다음 예시와 비슷해야 합니다.

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 다음 콘텐츠 예제가 포함된 `FlinkSessionJob` 사용자 지정 정의 리소스 파일 `basic-session-job.yaml`을 생성합니다.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 다음 명령으로 Flink 세션 작업을 제출합니다. 이렇게 하면 `FlinkSessionJob` 객체 `basic-session-job`이 생성됩니다.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 다음과 같은 명령을 사용하여 세션 클러스터 `LIFECYCLE`이 `STABLE`이고 `JOB STATUS`가 `RUNNING`인지 확인하세요.

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   출력은 다음 예시와 비슷해야 합니다.

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Flink UI에 액세스합니다.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. `localhost:8081`을 열어서 Flink 작업을 로컬에서 확인합니다.

1. 작업을 정리합니다. 이 작업을 위해 생성된 S3 아티팩트(예: 검사, 고가용성, 메타데이터 세이브포인트 및 CloudWatch 로그)를 정리해야 합니다.

------