

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon EMR on EKS를 사용하여 Flink 작업 실행
<a name="run-flink-jobs"></a>

Amazon EMR 릴리스 6.13.0 이상은 Amazon EMR on EKS의 작업 제출 모델로 Apache Flink 또는 Flink Kubernetes 운영자를 사용하는 Amazon EMR on EKS를 지원합니다. Apache Flink가 포함된 Amazon EMR on EKS를 사용하면 자체 Amazon EKS 클러스터에서 Amazon EMR 릴리스 런타임으로 Flink 애플리케이션을 배포하고 관리할 수 있습니다. Amazon EKS 클러스터에 Flink Kubernetes 운영자를 배포한 후에는 운영자를 통해 Flink 애플리케이션을 직접 제출할 수 있습니다. 운영자는 Flink 애플리케이션의 수명 주기를 관리합니다.

**Topics**
+ [Flink Kubernetes 연산자 설정 및 사용](jobruns-flink-kubernetes-operator.md)
+ [Flink 네이티브 Kubernetes 사용](jobruns-flink-native-kubernetes.md)
+ [Flink 및 FluentD에 대한 Docker 이미지 사용자 지정](jobruns-flink-docker-flink-fluentd.md)
+ [Flink Kubernetes 운영자 및 Flink 작업 모니터링](jobruns-flink-monitoring.md)
+ [Flink에서 고가용성 및 작업 복원력을 지원하는 방법](jobruns-flink-resiliency.md)
+ [Flink 애플리케이션에 대한 Autoscaler 사용](jobruns-flink-autoscaler.md)
+ [Amazon EMR on EKS에서 Flink 작업에 대한 유지 관리 및 문제 해결](jobruns-flink-troubleshooting.md)
+ [Apache Flink가 포함된 Amazon EMR on EKS에 대해 지원되는 릴리스](jobruns-flink-security-release-versions.md)

# Flink Kubernetes 연산자 설정 및 사용
<a name="jobruns-flink-kubernetes-operator"></a>

다음 페이지에서는 Flink Kubernetes 운영자를 설정 및 사용하여, Amazon EMR on EKS에서 Flink 작업을 실행하는 방법을 설명합니다. 사용 가능한 주제에는 필수 사전 조건, 환경 설정 방법, Amazon EMR on EKS에서 Flink 애플리케이션 실행이 포함됩니다.

**Topics**
+ [Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 설정](jobruns-flink-kubernetes-operator-setup.md)
+ [Amazon EMR on EKS에 대한 Flink Kubernetes 연산자 설치](jobruns-flink-kubernetes-operator-getting-started.md)
+ [Flink 애플리케이션 실행](jobruns-flink-kubernetes-operator-run-application.md)
+ [Flink 애플리케이션을 실행하기 위한 보안 역할 권한](jobruns-flink-kubernetes-security.md)
+ [Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 제거](jobruns-flink-kubernetes-operator-uninstall.md)

# Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 설정
<a name="jobruns-flink-kubernetes-operator-setup"></a>

Amazon EKS에서 Flink Kubernetes 운영자를 설치하려면 먼저 설정을 위해 다음 작업을 수행합니다. Amazon Web Services(AWS)에 이미 가입했고 Amazon EKS를 사용하고 있는 경우 Amazon EMR on EKS를 사용할 준비를 거의 마친 상태입니다. Amazon EKS에서 Flink 운영자를 설정하려면 다음 작업을 수행합니다. 필수 조건 중 하나를 이미 완료한 경우 해당 조건을 건너뛰고 다음 조건으로 넘어갈 수 있습니다.
+ **[의 최신 버전 설치 또는 업데이트 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) ** -를 이미 설치한 경우 최신 버전이 있는지 AWS CLI확인합니다.
+ **[kubectl 및 eksctl 설치](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** - eksctl은 Amazon EKS와 통신하는 데 사용하는 명령줄 도구입니다.
+ **[Install Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** – Kubernetes용 Helm 패키지 관리자는 Kubernetes 클러스터에서 애플리케이션을 설치하고 관리하는 데 도움이 됩니다.
+ **[Amazon EKS 시작하기 – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** - 관련 단계를 수행하여 Amazon EKS에서 노드를 포함하는 새 Kubernetes 클러스터를 생성합니다.
+ **[Amazon EMR 릴리스 레이블 선택](jobruns-flink-security-release-versions.md)(릴리스 6.13.0 이상)** - Flink Kubernetes 연산자는 Amazon EMR 릴리스 6.13.0 이상에서 지원됩니다.
+ **[Amazon EKS 클러스터에서 서비스 계정에 대한 IAM 역할(IRSA)을 활성화](setting-up-enable-IAM.md)**합니다.
+ **[작업 실행 역할을 생성](creating-job-execution-role.md)**합니다.
+ **[작업 실행 역할의 신뢰 정책을 업데이트](setting-up-trust-policy.md)**합니다.
+ 운영자 실행 역할을 생성합니다. 이 단계는 선택 사항입니다. Flink 작업과 운영자에 동일한 역할을 사용할 수 있습니다. 운영자에 대해 다른 IAM 역할을 사용하려는 경우 별도의 역할을 생성할 수 있습니다.
+ 운영자 실행 역할의 신뢰 정책을 업데이트합니다. Amazon EMR Flink Kubernetes 운영자 서비스 계정에 사용할 역할에 대해 신뢰 정책 항목 하나를 명시적으로 추가해야 합니다. 다음 예제 형식을 따를 수 있습니다.

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# Amazon EMR on EKS에 대한 Flink Kubernetes 연산자 설치
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

이 주제는 Flink 배포를 준비하여 Amazon EKS에서 Flink Kubernetes 연산자 사용을 시작하는 데 도움을 줍니다.

## Kubernetes 연산자 설치
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

다음 단계를 사용하여 Apache Flink용 Kubernetes 운영자를 설치합니다.

1. 아직 실행하지 않았다면, [Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 설정](jobruns-flink-kubernetes-operator-setup.md)의 단계를 완료합니다.

1. *cert-manager*(Amazon EKS 클러스터당 하나)를 설치하여 웹후크 구성 요소 추가를 지원합니다.

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. 차트 Helm을 설치합니다.

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   출력 예시:

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. 배포가 완료될 때까지 기다린 후 차트 설치를 확인합니다.

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. 배포가 완료되면 다음 메시지가 표시됩니다.

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. 다음 명령을 사용하여 배치된 운영자를 확인합니다.

   ```
   helm list --namespace $NAMESPACE
   ```

   다음은 앱 버전 `x.y.z-amzn-n`이 Amazon EMR on EKS 릴리스의 Flink 운영자 버전과 일치하는 예제 출력을 보여줍니다. 자세한 내용은 [Apache Flink가 포함된 Amazon EMR on EKS에 대해 지원되는 릴리스](jobruns-flink-security-release-versions.md) 단원을 참조하십시오.

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### Kubernetes 연산자 업그레이드
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

Flink 연산자의 버전을 업그레이드하려면 다음 단계를 따르세요.

1. 이전 `flink-kubernetes-operator`: `helm uninstall flink-kubernetes-operator -n <NAMESPACE>`를 제거합니다.

1. `kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org` CRD를 삭제(helm은 이전 CRD를 자동으로 삭제하지 않으므로)합니다.

1. `flink-kubernetes-operator`를 최신 버전으로 다시 설치합니다.

# Flink 애플리케이션 실행
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Amazon EMR 6.13.0 이상을 사용할 경우 Amazon EMR on EKS의 애플리케이션 모드에서 Flink Kubernetes 연산자로 Flink 애플리케이션을 실행할 수 있습니다. Amazon EMR 6.15.0 이상을 사용할 경우 세션 모드에서 Flink 애플리케이션을 실행할 수도 있습니다. 이 페이지는 Amazon EMR on EKS로 Flink 애플리케이션을 실행하기 위해 활용할 수 있는 두 가지 방법을 모두 설명합니다.

**Topics**

**참고**  
Flink 작업을 제출할 때 고가용성 메타데이터를 저장할 Amazon S3 버킷을 생성해야 합니다. 이 기능을 사용하고 싶지 않은 경우 비활성화할 수 있습니다. 기본적으로 활성화됩니다.

**전제 조건** – Flink Kubernetes 운영자로 Flink 애플리케이션을 실행하기 전에 [Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 설정](jobruns-flink-kubernetes-operator-setup.md) 및 [Kubernetes 연산자 설치](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator)의 단계를 완료합니다.

------
#### [ Application mode ]

Amazon EMR 6.13.0 이상을 사용할 경우 Amazon EMR on EKS의 애플리케이션 모드에서 Flink Kubernetes 연산자로 Flink 애플리케이션을 실행할 수 있습니다.

1. 다음 예제에서 `FlinkDeployment` 정의 파일 `basic-example-app-cluster.yaml`을 생성합니다. [옵트인 AWS 리전](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html) 중 하나를 활성화하고 사용하는 경우 구성의 주석 처리를 해제하고 구성해야 합니다`fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 다음 명령으로 Flink 배포를 제출합니다. 이렇게 하면 `FlinkDeployment` 객체(`basic-example-app-cluster`)도 생성됩니다.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Flink UI에 액세스합니다.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. `localhost:8081`을 열어서 Flink 작업을 로컬에서 확인합니다.

1. 작업을 정리합니다. 이 작업을 위해 생성된 S3 아티팩트(예: 검사, 고가용성, 메타데이터 세이브포인트 및 CloudWatch 로그)를 정리해야 합니다.

Flink Kubernetes 운영자를 통해 Flink에 애플리케이션을 제출하는 방법에 대한 자세한 내용은 GitHub의 `apache/flink-kubernetes-operator` 폴더에 있는 [Flink Kubernetes operator examples](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)를 참조하세요.

------
#### [ Session mode ]

Amazon EMR 6.15.0 이상을 사용할 경우 Amazon EMR on EKS의 세션 모드에서 Flink Kubernetes 연산자로 Flink 애플리케이션을 실행할 수 있습니다.

1. 다음 예제에서 이름이 `basic-example-app-cluster.yaml`인 `FlinkDeployment` 정의 파일을 생성합니다. [옵트인 AWS 리전](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html) 중 하나를 활성화하고 사용하는 경우 구성의 주석 처리를 해제하고 구성해야 합니다`fs.s3a.endpoint.region`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 다음 명령으로 Flink 배포를 제출합니다. 이렇게 하면 `FlinkDeployment` 객체(`basic-example-session-cluster`)도 생성됩니다.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 다음과 같은 명령을 사용하여 세션 클러스터 `LIFECYCLE`이 `STABLE`인지 확인하세요.

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   출력은 다음 예시와 비슷해야 합니다.

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 다음 콘텐츠 예제가 포함된 `FlinkSessionJob` 사용자 지정 정의 리소스 파일 `basic-session-job.yaml`을 생성합니다.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 다음 명령으로 Flink 세션 작업을 제출합니다. 이렇게 하면 `FlinkSessionJob` 객체 `basic-session-job`이 생성됩니다.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 다음과 같은 명령을 사용하여 세션 클러스터 `LIFECYCLE`이 `STABLE`이고 `JOB STATUS`가 `RUNNING`인지 확인하세요.

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   출력은 다음 예시와 비슷해야 합니다.

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Flink UI에 액세스합니다.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. `localhost:8081`을 열어서 Flink 작업을 로컬에서 확인합니다.

1. 작업을 정리합니다. 이 작업을 위해 생성된 S3 아티팩트(예: 검사, 고가용성, 메타데이터 세이브포인트 및 CloudWatch 로그)를 정리해야 합니다.

------

# Flink 애플리케이션을 실행하기 위한 보안 역할 권한
<a name="jobruns-flink-kubernetes-security"></a>

이 주제에서는 Flink 애플리케이션을 배포하고 실행하기 위한 보안 역할에 대해 설명합니다. 배포를 관리하고 작업을 생성 및 관리하는 데 필요한 두 가지 역할(작업자 역할과 작업 역할)이 있습니다. 이 주제에서는 이러한 역할을 소개하고 해당 권한을 나열합니다.

## 역할 기반 액세스 제어
<a name="jobruns-flink-kubernetes-security-rbac"></a>

운영자를 배포하고 Flink 작업을 실행하려면 두 개의 Kubernetes 역할(즉, 운영자 하나와 작업 역할 하나)을 생성해야 합니다. Amazon EMR은 운영자를 설치할 때 기본적으로 두 가지 역할을 생성합니다.

## 운영자 역할
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

운영자 역할을 사용하여 `flinkdeployments`를 관리함으로써 각 Flink 작업 및 기타 리소스(예: 서비스)에 대한 JobManager를 생성하고 관리합니다.

운영자 역할의 기본 이름은 `emr-containers-sa-flink-operator`이며, 다음과 같은 권한이 필요합니다.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## 작업 역할
<a name="jobruns-flink-security-job-role"></a>

JobManager는 작업 역할을 사용하여 각 작업에 대한 TaskManager 및 ConfigMap을 생성하고 관리합니다.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 제거
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

다음 단계에 따라 Flink Kubernetes 운영자를 제거합니다.

1. 운영자를 삭제합니다.

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. Helm에서 제거하지 않는 Kubernetes 리소스를 삭제합니다.

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (선택 사항) cert-manager를 삭제합니다.

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# Flink 네이티브 Kubernetes 사용
<a name="jobruns-flink-native-kubernetes"></a>

Amazon EMR 릴리스 6.13.0 이상에서는 Flink 애플리케이션을 Amazon EMR on EKS 클러스터에 제출하고 해당 애플리케이션을 실행하는 데 사용할 수 있는 명령줄 도구로 Flink 네이티브 Kubernetes를 지원합니다.

**Topics**
+ [Amazon EMR on EKS에서 Flink 네이티브 Kubernetes 설정](jobruns-flink-native-kubernetes-setup.md)
+ [Amazon EMR on EKS용 Flink 네이티브 Kubernetes 시작하기](jobruns-flink-native-kubernetes-getting-started.md)
+ [네이티브 Kubernetes에 대한 Flink JobManager 서비스 계정 보안 요구 사항](jobruns-flink-native-kubernetes-security-requirements.md)

# Amazon EMR on EKS에서 Flink 네이티브 Kubernetes 설정
<a name="jobruns-flink-native-kubernetes-setup"></a>

Amazon EMR on EKS에서 Flink CLI를 사용하여 애플리케이션을 실행하기 전에 다음 작업을 완료합니다. Amazon Web Services(AWS)에 이미 가입했고 Amazon EKS를 사용하고 있는 경우 Amazon EMR on EKS를 사용할 준비를 거의 마친 상태입니다. 필수 조건 중 하나를 이미 완료한 경우 해당 조건을 건너뛰고 다음 조건으로 넘어갈 수 있습니다.
+ **[의 최신 버전 설치 또는 업데이트 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) ** -를 이미 설치한 경우 최신 버전이 있는지 AWS CLI확인합니다.
+ **[Amazon EKS 시작하기 – eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) ** - 관련 단계를 수행하여 Amazon EKS에서 노드를 포함하는 새 Kubernetes 클러스터를 생성합니다.
+ **[Amazon EMR 기본 이미지 URI 선택](docker-custom-images-tag.md)(릴리스 6.13.0 이상)** - Flink Kubernetes 명령은 Amazon EMR 릴리스 6.13.0 이상에서 지원됩니다.
+ JobManager 서비스 계정에 TaskManager 포드를 생성하고 감시할 수 있는 적절한 권한이 있는지 확인합니다. 자세한 내용은 [네이티브 Kubernetes에 대한 Flink JobManager 서비스 계정 보안 요구 사항](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html)을 참조하세요.
+ 로컬 [AWS 보안 인증 프로파일](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)을 설정합니다.
+ Flink 애플리케이션을 실행하려는 [Amazon EKS 클러스터용 kubeconfig 파일을 생성 또는 업데이트](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html)합니다.

# Amazon EMR on EKS용 Flink 네이티브 Kubernetes 시작하기
<a name="jobruns-flink-native-kubernetes-getting-started"></a>

이 단계에서는 Flink 애플리케이션을 구성하고, 이에 대한 서비스 계정을 설정하며, 실행하는 방법을 보여줍니다. Flink 네이티브 Kubernetes는 실행 중인 Kubernetes 클러스터에 Flink를 배포하는 데 사용됩니다.

## Flink 애플리케이션 구성 및 실행
<a name="jobruns-flink-native-kubernetes-getting-started-run-application"></a>

Amazon EMR 6.13.0 이상은 Amazon EKS 클러스터에서 Flink 애플리케이션을 실행하기 위한 Flink 네이티브 Kubernetes를 지원합니다. Flink 애플리케이션을 실행하려면 다음 단계를 수행합니다.

1. Flink 네이티브 Kubernetes 명령으로 Flink 애플리케이션을 실행하려면 먼저 [Amazon EMR on EKS에서 Flink 네이티브 Kubernetes 설정](jobruns-flink-native-kubernetes-setup.md)의 단계를 완료합니다.

1. [Flink 다운로드 및 설치](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation).

1. 다음과 같은 환경 변수의 값을 설정합니다.

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-AWS 리전-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. Kubernetes 리소스를 관리할 서비스 계정을 생성합니다.

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. `run-application` CLI 명령을 실행합니다.

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. 생성된 Kubernetes 리소스를 검사합니다.

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. 포트는 8081로 포워딩됩니다.

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. Flink UI에 로컬로 액세스합니다.  
![\[Flink UI에 액세스합니다.\]](http://docs.aws.amazon.com/ko_kr/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. Flink 애플리케이션을 삭제합니다.

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

Flink에 애플리케이션을 제출하는 방법에 대한 자세한 내용은 Apache Flink 설명서에서 [Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/)를 참조하세요.

# 네이티브 Kubernetes에 대한 Flink JobManager 서비스 계정 보안 요구 사항
<a name="jobruns-flink-native-kubernetes-security-requirements"></a>

Flink JobManager 포드는 Kubernetes 서비스 계정을 사용함으로써 Kubernetes API 서버에 액세스하여 TaskManager 포드를 생성하고 감시합니다. JobManager 서비스 계정에는 TaskManager 포드를 생성 및 삭제할 수 있는 적절한 권한이 있어야 하며, 클러스터에서 JobManager 및 ResourceManager의 주소를 검색하기 위해 TaskManager에서 리더 ConfigMaps를 감시하도록 허용해야 합니다.

이 서비스 계정에는 다음 규칙이 적용됩니다.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# Flink 및 FluentD에 대한 Docker 이미지 사용자 지정
<a name="jobruns-flink-docker-flink-fluentd"></a>

다음 단계를 수행하여 Apache Flink 또는 FluentD 이미지를 사용해 Amazon EMR on EKS에 대한 Docker 이미지를 사용자 지정합니다. 여기에는 기본 이미지 가져오기, 사용자 지정, 게시 및 워크로드 제출에 대한 기술 지침이 포함됩니다.

**Topics**
+ [사전 조건](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [1단계: Amazon Elastic Container Registry에서 기본 이미지 검색](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [2단계: 기본 이미지 사용자 지정](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [3단계: 사용자 지정 이미지 게시](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [4단계: 사용자 지정 이미지를 사용하여 Amazon EMR에서 Flink 워크로드 제출](#jobruns-flink-docker-flink-fluentd-submit-workload)

## 사전 조건
<a name="jobruns-flink-docker-flink-fluentd-prereqs"></a>

Docker 이미지를 사용자 지정하기 전에 다음 사전 조건을 완료했는지 확인합니다.
+ [Amazon EMR on EKS에 대한 Flink Kubernetes 연산자 설정](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html)을 완료했습니다.
+ 환경에 Docker를 설치했습니다. 자세한 내용은 [Get Docker](https://docs.docker.com/get-docker/)를 참조하세요.

## 1단계: Amazon Elastic Container Registry에서 기본 이미지 검색
<a name="jobruns-flink-docker-flink-fluentd-retrieve-base"></a>

기본 이미지에는 다른 AWS 서비스에 액세스하는 데 사용되는 Amazon EMR 런타임 및 커넥터가 포함되어 있습니다. Flink 버전 6.14.0 이상의 EKS에서 Amazon EMR on EKS를 사용하는 경우 Amazon ECR 퍼블릭 갤러리에서 기본 이미지를 가져올 수 있습니다. 갤러리를 탐색하여 이미지 링크를 찾은 다음, 이미지를 로컬 Workspace로 가져옵니다. 예를 들어 Amazon EMR 6.14.0 릴리스의 경우 다음 `docker pull` 명령은 최신 표준 기본 이미지를 반환합니다. `emr-6.14.0:latest`를 원하는 릴리스 버전으로 바꿉니다.

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

다음은 Flink 갤러리 이미지 및 Fluentd 갤러리 이미지에 대한 링크입니다.
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fluentd/emr-6.14.0](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## 2단계: 기본 이미지 사용자 지정
<a name="jobruns-flink-docker-flink-fluentd-customize-image"></a>

다음 단계에서는 Amazon ECR에서 가져온 기본 이미지를 사용자 지정하는 방법을 설명합니다.

1. 로컬 Workspace에 새 `Dockerfile`을 생성합니다.

1. `Dockerfile`을 수정하고 다음 콘텐츠를 추가합니다. 이 `Dockerfile`에서는 `public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest`에서 가져온 컨테이너 이미지를 사용합니다.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   `Fluentd`를 사용하는 경우 다음 구성을 사용합니다.

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. `Dockerfile`에 명령을 추가하여 기본 이미지를 사용자 지정합니다. 다음 명령은 Python 라이브러리를 설치하는 명령을 시연합니다.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. `DockerFile`을 생성한 같은 디렉터리에서 다음 명령을 실행하여 Docker 이미지를 빌드합니다. `-t` 플래그 다음에 제공하는 필드는 이미지의 사용자 지정 이름입니다.

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## 3단계: 사용자 지정 이미지 게시
<a name="jobruns-flink-docker-flink-fluentd-publish-image"></a>

이제 Amazon ECR 레지스트리에 새 Docker 이미지를 게시할 수 있습니다.

1. 다음 명령을 실행하여 Docker 이미지를 저장할 Amazon ECR 리포지토리를 생성합니다. 리포지토리의 이름을 입력합니다(예: `emr_custom_repo.`). 자세한 내용은 Amazon Elastic Container Registry 사용 설명서의 [리포지토리 생성](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository)을 참조하세요.

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. 다음 명령을 실행하여 기본 레지스트리에 인증합니다. 자세한 내용은 Amazon Elastic Container Registry 사용 설명서의 [기본 레지스트리에 대해 인증](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry)을 참조하세요.

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. 이미지를 푸시합니다. 자세한 내용은 Amazon Elastic Container Registry 사용 설명서의 [Amazon ECR에 이미지 푸시](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image)를 참조하세요.

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## 4단계: 사용자 지정 이미지를 사용하여 Amazon EMR에서 Flink 워크로드 제출
<a name="jobruns-flink-docker-flink-fluentd-submit-workload"></a>

사용자 지정 이미지를 사용하려면 `FlinkDeployment` 사양을 다음과 같이 변경합니다. 이를 수행하려면 배포 사양의 `spec.image` 줄에 자체 이미지를 입력합니다.

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

Fluentd 작업에 사용자 지정 이미지를 사용하려면 배포 사양의 `monitoringConfiguration.image` 줄에 자체 이미지를 입력합니다.

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# Flink Kubernetes 운영자 및 Flink 작업 모니터링
<a name="jobruns-flink-monitoring"></a>

이 섹션에서는 Amazon EMR on EKS를 사용하여 Flink 작업을 모니터링할 수 있는 여러 가지 방법을 설명합니다. 여기에는 Amazon Managed Service for Prometheus와의 Flink 통합, 작업 상태 및 지표를 제공하는 *Flink 웹 대시보드* 사용 또는 모니터링 구성을 사용하여 Amazon S3 및 Amazon CloudWatch로 로그 데이터 전송이 포함됩니다.

**Topics**
+ [Amazon Managed Service for Prometheus를 사용하여 Flink 작업 모니터링](jobruns-flink-monitoring-prometheus.md)
+ [Flink UI를 사용하여 Flink 작업 모니터링](jobruns-flink-monitoring-ui.md)
+ [모니터링 구성을 사용하여 Flink Kubernetes 연산자 및 Flink 작업 모니터링](jobruns-flink-monitoring-configuration.md)

# Amazon Managed Service for Prometheus를 사용하여 Flink 작업 모니터링
<a name="jobruns-flink-monitoring-prometheus"></a>

Amazon Managed Service for Prometheus(관리 포털)와 Apache Flink를 통합할 수 있습니다. Amazon Managed Service for Prometheus는 Amazon EKS에서 실행되는 클러스터에서 Amazon Managed Service for Prometheus 서버의 지표 수집을 지원합니다. Amazon Managed Service for Prometheus는 Amazon EKS 클러스터에서 이미 실행 중인 Prometheus 서버와 함께 작동합니다. Amazon EMR Flink 운영자와의 Amazon Managed Service for Prometheus 통합을 실행하면 Amazon Managed Service for Prometheus와 통합되도록 Prometheus 서버를 자동으로 배포하고 구성합니다.

1. [Amazon Managed Service for Prometheus Workspace를 생성합니다](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html). 이 Workspace는 수집 엔드포인트 역할을 합니다. 나중에 원격 쓰기 URL이 필요합니다.

1. 서비스 계정에 대한 IAM 역할 설정.

   이 온보딩 방법에서는 Prometheus 서버가 실행되는 Amazon EKS 클러스터의 서비스 계정에 대한 IAM 역할을 사용합니다. 이러한 역할을 *서비스 역할*이라고도 합니다.

   아직 역할이 없는 경우 [Amazon EKS 클러스터의 지표 수집을 위한 서비스 역할을 설정](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html)합니다.

   계속하기 전에 `amp-iamproxy-ingest-role`이라는 IAM 역할을 생성합니다.

1. Amazon Managed Service for Prometheus와 함께 Amazon EMR Flink 운영자를 설치합니다.

이제 Amazon Managed Service for Prometheus Workspace, Amazon Managed Service for Prometheus에 대한 전용 IAM 역할 및 필요한 권한이 확보되었으므로 Amazon EMR Flink 운영자를 설치할 수 있습니다.

`enable-amp.yaml` 파일을 생성합니다. 이 파일을 사용하면 사용자 지정 구성을 사용하여 Amazon Managed Service for Prometheus 설정을 재정의할 수 있습니다. 고유한 역할을 사용해야 합니다.

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

[https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) 명령을 사용하여 `flink-kubernetes-operator` 차트에 재정의를 전달합니다.

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

이 명령은 포트 9999의 연산자에 Prometheus 리포터를 자동으로 설치합니다. 향후 `FlinkDeployment`는 9249의 `metrics` 포트를 공개합니다.
+ Flink 운영자 지표는 Prometheus의 `flink_k8soperator_` 레이블 아래에 표시됩니다.
+ Flink 태스크 관리자 지표는 Prometheus의 `flink_taskmanager_` 레이블 아래에 표시됩니다.
+ Flink 작업 관리자 지표는 Prometheus의 `flink_jobmanager_` 레이블 아래에 표시됩니다.

# Flink UI를 사용하여 Flink 작업 모니터링
<a name="jobruns-flink-monitoring-ui"></a>

실행 중인 Flink 애플리케이션의 상태와 성능을 모니터링하려면 *Flink 웹 대시보드*를 사용합니다. 이 대시보드는 작업 상태, TaskManager 수, 작업에 대한 지표 및 로그에 대한 정보를 제공합니다. 또한 Flink 작업의 구성을 확인 및 수정하고 Flink 클러스터와 상호 작용하여 작업을 제출하거나 취소할 수 있습니다.

Kubernetes에서 실행 중인 Flink 애플리케이션의 Flink 웹 대시보드에 액세스하는 방법:

1. `kubectl port-forward` 명령을 사용하여 Flink 애플리케이션의 TaskManager 포드에서 Flink 웹 대시보드가 실행되는 포트에 로컬 포트를 전달합니다. 기본적으로 이 포트는 8081입니다. *deployment-name*을 위에서 언급한 Flink 애플리케이션 배포 이름으로 바꿉니다.

   ```
   kubectl get deployments -n namespace
   ```

   출력 예시:

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. 로컬에서 다른 포트를 사용하려면 *local-port*:8081 파라미터를 사용합니다.

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. 웹 브라우저에서 Flink 웹 대시보드에 액세스하려면 `http://localhost:8081` 또는 사용자 지정 로컬 포트를 사용하는 경우 `http://localhost:local-port`로 이동합니다. 이 대시보드에는 작업 상태, TaskManager 수, 작업에 대한 지표 및 로그와 같은 실행 중인 Flink 애플리케이션에 대한 정보가 표시됩니다.  
![\[샘플 Flink 대시보드 UI\]](http://docs.aws.amazon.com/ko_kr/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# 모니터링 구성을 사용하여 Flink Kubernetes 연산자 및 Flink 작업 모니터링
<a name="jobruns-flink-monitoring-configuration"></a>

모니터링 구성을 사용하면 Flink 애플리케이션 및 운영자 로그의 로그 아카이브를 S3 및/또는 CloudWatch로 쉽게 설정할 수 있습니다(둘 중 하나 또는 둘 다 선택 가능). 이렇게 하면 JobManager 및 TaskManager 포드에 FluentD sidecar가 추가되고 이후에 이러한 구성 요소의 로그가 구성된 싱크로 전달됩니다.

**참고**  
이 기능을 사용하려면 다른 AWS 서비스와 상호 작용해야 하므로 Flink 운영자의 서비스 계정 및 Flink 작업(서비스 계정)에 대해 IAM 역할을 설정해야 합니다. [Amazon EMR on EKS에 대한 Flink Kubernetes 운영자 설정](jobruns-flink-kubernetes-operator-setup.md)에서 IRSA를 사용하여 설정해야 합니다.

## Flink 애플리케이션 로그
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

다음과 같은 방법으로 이 구성을 정의할 수 있습니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

유효한 구성 옵션은 다음과 같습니다.
+ `s3MonitoringConfiguration` - S3로의 전달을 설정하기 위한 구성 키
  + `logUri`(필수) - 로그를 저장할 S3 버킷 경로.
  + 로그가 업로드되고 나면 S3의 경로는 다음과 같습니다.
    + 로그 로테이션이 활성화되지 않았습니다.

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + 로그 로테이션이 활성화되었습니다. 로테이션된 파일과 현재 파일(날짜 스탬프가 없는 파일)을 모두 사용할 수 있습니다.

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      다음 형식은 증가하는 숫자입니다.

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + 이 전달자를 사용하려면 다음 IAM 권한이 필요합니다.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration` - CloudWatch로의 전달을 설정하기 위한 구성 키.
  + `logGroupName`(필수) - 로그를 전송하려는 CloudWatch 로그 그룹의 이름(그룹이 없는 경우 자동으로 생성됨).
  + `logStreamNamePrefix`(선택 사항) - 로그를 보낼 로그 스트림의 이름입니다. 기본값은 빈 문자열입니다. 형식은 다음과 같습니다.

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + 이 전달자를 사용하려면 다음 IAM 권한이 필요합니다.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources`(선택 사항) - 시작된 Fluentbit sidecar 컨테이너에서 리소스 한도를 설정하기 위한 구성 키.
  + `memoryLimit`(선택 사항) - 기본값은 512Mi입니다. 필요에 따라 조정합니다.
  + `cpuLimit`(선택 사항) - 이 옵션에는 기본값이 없습니다. 필요에 따라 조정합니다.
+ `containerLogRotationConfiguration`(선택 사항) - 컨테이너 로그 로테이션 동작을 제어합니다. 기본적으로 활성화됩니다.
  + `rotationSize`(필수) - 로그 로테이션을 위한 파일 크기를 지정합니다. 가능한 값 범위는 2KB에서 2GB 사이입니다. rotationSize 파라미터의 숫자 단위 부분은 정수로 전달됩니다. 십진수는 지원되지 않으므로 로테이션 크기를 1.5GB(예: 1,500MB 값)로 지정할 수 있습니다. 기본값은 2GB입니다.
  + `maxFilesToKeep`(필수) - 로테이션을 수행한 후 컨테이너에서 보존할 최대 파일 수를 지정합니다. 최솟값은 1이고 최댓값은 50입니다. 기본값은 10입니다.

## Flink 운영자 로그
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

또한 차트 Helm 설치에 관한 `values.yaml` 파일에서 다음 옵션을 사용하여 운영자를 위한 로그 아카이브를 활성화할 수 있습니다. S3, CloudWatch 또는 둘 다 활성화할 수 있습니다.

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

`monitoringConfiguration` 아래에서 다음과 같은 구성 옵션을 사용할 수 있습니다.
+ `s3MonitoringConfiguration` - S3에 아카이브하려면 이 옵션을 설정합니다.
+ `logUri`(필수) - 로그를 저장할 S3 버킷 경로.
+ 다음은 로그가 업로드된 후의 S3 버킷 경로 형식입니다.
  + 로그 로테이션이 활성화되지 않았습니다.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + 로그 로테이션이 활성화되었습니다. 로테이션된 파일과 현재 파일(날짜 스탬프가 없는 파일)을 모두 사용할 수 있습니다.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    다음 형식 인덱스는 증가하는 숫자입니다.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration` - CloudWatch로의 전달을 설정하기 위한 구성 키.
  + `logGroupName`(필수) - 로그를 보낼 CloudWatch 로그 그룹의 이름. 그룹이 없으면 그룹이 자동으로 생성됩니다.
  + `logStreamNamePrefix`(선택 사항) - 로그를 보낼 로그 스트림의 이름. 기본값은 빈 문자열입니다. CloudWatch의 형식은 다음과 같습니다.

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources`(선택 사항) - 시작된 Fluentbit sidecar 컨테이너에서 리소스 한도를 설정하기 위한 구성 키.
  + `memoryLimit`(선택 사항) - 메모리 한도. 필요에 따라 조정합니다. 기본값은 512Mi입니다.
  + `cpuLimit` - CPU 한도. 필요에 따라 조정합니다. 기본값은 없습니다.
+ `containerLogRotationConfiguration`(선택 사항) - 컨테이너 로그 로테이션 동작을 제어합니다. 기본적으로 활성화됩니다.
  + `rotationSize`(필수) - 로그 로테이션을 위한 파일 크기를 지정합니다. 가능한 값 범위는 2KB에서 2GB 사이입니다. rotationSize 파라미터의 숫자 단위 부분은 정수로 전달됩니다. 십진수는 지원되지 않으므로 로테이션 크기를 1.5GB(예: 1,500MB 값)로 지정할 수 있습니다. 기본값은 2GB입니다.
  + `maxFilesToKeep`(필수) - 로테이션을 수행한 후 컨테이너에서 보존할 최대 파일 수를 지정합니다. 최솟값은 1이고 최댓값은 50입니다. 기본값은 10입니다.

# Flink에서 고가용성 및 작업 복원력을 지원하는 방법
<a name="jobruns-flink-resiliency"></a>

다음 섹션에서는 Flink 작업의 신뢰성과 가용성을 높이는 방법을 간략하게 설명합니다. Flink 고가용성 및 장애가 발생할 경우 다양한 복구 기능과 같은 내장 기능을 통해 이를 수행합니다.

**Topics**
+ [Flink 운영자 및 Flink 애플리케이션을 위한 고가용성(HA) 사용](jobruns-flink-using-ha.md)
+ [Amazon EMR on EKS로 태스크 복구 및 작업 규모 조정을 위한 Flink 작업 재시작 시간 최적화](jobruns-flink-restart.md)
+ [Amazon EMR on EKS에서 Flink를 사용한 스팟 인스턴스의 정상적인 서비스 해제](jobruns-flink-decommission.md)

# Flink 운영자 및 Flink 애플리케이션을 위한 고가용성(HA) 사용
<a name="jobruns-flink-using-ha"></a>

이 주제에서는 고가용성을 구성하는 방법 및 몇 가지 사용 사례에 대한 작동 방식을 설명합니다. 여기에는 작업 관리자를 사용하는 경우와 Flink 네이티브 kubernetes를 사용하는 경우가 포함됩니다.

## Flink 운영자 고가용성
<a name="jobruns-flink-ha-operator"></a>

Flink 운영자의 *고가용성*을 활성화하여 장애 발생 시 대기 중인 Flink 운영자로 장애 조치하여 운영자 제어 루프에서 가동 중단을 최소화할 수 있습니다. 고가용성은 기본적으로 활성화되어 있으며 시작 운영자 복제본의 기본 수는 2입니다. 차트 Helm에 대해 `values.yaml` 파일의 복제본 필드를 구성할 수 있습니다.

다음 필드를 사용자 지정 가능합니다.
+ `replicas`(선택 사항, 기본값: 2): 이 숫자를 1보다 크게 설정하면 다른 대기 운영자가 생성되고 작업을 더 빠르게 복구할 수 있습니다.
+ `highAvailabilityEnabled`(선택 사항, 기본값: true): HA 활성화 여부를 제어합니다. 이 파라미터를 true로 지정하면 다중 AZ 배포를 지원하고 올바른 `flink-conf.yaml` 파라미터를 설정할 수 있습니다.

`values.yaml` 파일에서 다음 구성을 설정하여 운영자의 HA를 비활성화할 수 있습니다.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**다중 AZ 배포**

여러 가용 영역에서 운영자 포드를 생성합니다. 이는 약한 제약 조건이며, 다른 AZ에 충분한 리소스가 없는 경우 운영자 포드가 동일한 AZ에서 예약됩니다.

**리더 복제본 결정**

 HA가 활성화된 경우 복제본은 리스 기능을 사용하여 어떤 JM이 리더인지 결정하고 리더 선택에 K8s Lease를 사용합니다. 리스 기능을 설명하고 .Spec.Holder Identity 필드를 확인하여 현재 리더를 결정할 수 있습니다.

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 상호 작용**

**액세스 보안 인증 구성**

S3 버킷에 액세스할 수 있는 적절한 IAM 권한으로 IRSA를 구성했는지 확인하세요.

**S3 애플리케이션 모드에서 작업 jar 가져오기**

Flink 운영자는 S3에서 애플리케이션 jar 가져오기도 지원합니다. FlinkDeployment 사양에서 JarURI의 S3 위치를 제공하면 됩니다.

이 기능을 사용하여 PyFlink 스크립트와 같은 다른 아티팩트를 다운로드할 수도 있습니다. 결과 Python 스크립트는 `/opt/flink/usrlib/` 경로 아래에 배치됩니다.

다음 예제에서는 PyFlink 작업에 대해 이 기능을 사용하는 방법을 보여줍니다. jarURI 및 args 필드를 기록합니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 커넥터**

Flink는 두 개의 S3 커넥터(아래 목록 참조)와 함께 제공됩니다. 다음 섹션에서는 어떤 커넥터를 언제 사용해야 하는지를 설명합니다.

**검사: Presto S3 커넥터**
+ S3 스키마를 s3p://로 설정
+ s3에 대한 검사에 사용할 권장 커넥터. 자세한 내용은 Apache Flink 설명서의 [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)을 참조하세요.

FlinkDeployment 사양 예제:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**S3: Hadoop S3 커넥터에 대한 읽기 및 쓰기**
+ S3 스키마를 `s3://` 또는 `s3a://`로 설정
+ S3에서 파일을 읽고 쓰는 데 권장되는 커넥터([Flinks 파일 시스템 인터페이스](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)를 구현하는 S3 커넥터만 해당).
+ 기본적으로 `flink-conf.yaml` 파일에서 `fs.s3a.aws.credentials.provider`를 설정합니다(`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`). 기본값(`flink-conf`)을 완전히 재정의하고 S3와 상호 작용하는 경우 이 제공업체를 사용해야 합니다.

FlinkDeployment 사양 예제

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink 작업 관리자
<a name="jobruns-flink-ha-manager"></a>

Flink 배포용 고가용성(HA)을 통해 일시적인 오류가 발생하여 JobManager가 충돌하더라도 작업을 계속 진행할 수 있습니다. 작업은 HA가 활성화된 상태에서 마지막으로 성공한 체크포인트부터 다시 시작됩니다. HA를 활성화하지 않으면 Kubernetes는 JobManager를 다시 시작하지만 작업은 새 작업으로 시작되고 진행 상황은 사라집니다. HA를 구성한 후에 JobManager에서 일시적인 오류가 발생할 경우 참조할 수 있도록 HA 메타데이터를 영구 스토리지에 저장한 다음, 마지막으로 성공한 체크포인트에서 작업을 재개하도록 Kubernetes에 지시할 수 있습니다.

Flink 작업에 대해 HA는 기본적으로 활성화되어 있습니다(복제본 수는 2로 설정되어 있으며, 이 경우 HA 메타데이터가 지속되려면 S3 스토리지 위치를 제공해야 함).

**HA 구성**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

다음은 작업 관리자(.spec.jobManager에서 정의됨)에서 위 HA 구성에 대한 설명입니다.
+ `highAvailabilityEnabled`(선택 사항, 기본값: true): HA를 활성화하지 않고 제공된 HA 구성을 사용하지 않으려면 이 옵션을 `false `로 설정합니다. 여전히 '복제본' 필드를 조작하여 HA를 수동으로 구성할 수 있습니다.
+ `replicas`(선택 사항, 기본값: 2): 이 숫자를 1보다 크게 설정하면 다른 대기 JobManagers가 생성되고 작업을 더 빠르게 복구할 수 있습니다. HA를 비활성화하는 경우 복제본 수를 1로 설정해야 합니다. 그렇지 않으면 검증 오류가 계속 발생합니다(HA가 활성화되지 않은 경우 복제본 1개만 지원됨).
+ `storageDir`(필수): 기본적으로 복제본 수를 2로 사용하기 때문에 영구 storageDir을 제공해야 합니다. 현재 이 필드에는 스토리지 위치로 S3 경로만 허용합니다.

**포드 지역성**

 또한 HA를 활성화하는 경우 동일한 AZ에 파드를 배치하려고 시도하므로 성능이 개선됩니다(동일한 AZ에 포드를 배치하여 네트워크 지연 시간 감소). 이는 최대한의 원칙이 적용되는 프로세스입니다. 즉, 대부분의 포드가 예약된 AZ에 충분한 리소스가 없는 경우 나머지 포드는 여전히 예약되지만 결국 이 AZ 외부의 노드에 배치될 수 있습니다.

**리더 복제본 결정**

HA가 활성화된 경우 복제본은 리스 기능을 통해 어떤 JM이 리더인지 결정하고 이 메타데이터를 저장할 데이터 스토어로 K8s Configmap을 사용합니다. 리더를 결정하려면 Configmap의 콘텐츠를 살펴보고 데이터 아래에 있는 `org.apache.flink.k8s.leader.restserver` 키를 확인하여 IP 주소가 있는 K8s 포드를 찾을 수 있습니다. 다음과 같은 bash 명령을 사용할 수 있습니다.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 작업 - 네이티브 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 이상은 Amazon EKS 클러스터에서 고가용성 모드의 Flink 애플리케이션을 실행하기 위한 Flink 네이티브 Kubernetes를 지원합니다.

**참고**  
Flink 작업을 제출할 때 고가용성 메타데이터를 저장할 Amazon S3 버킷을 생성해야 합니다. 이 기능을 사용하고 싶지 않은 경우 비활성화할 수 있습니다. 기본적으로 활성화됩니다.

Flink 고가용성 특성을 활성화하려면 [`run-application` CLI 명령](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)을 실행할 때 다음 Flink 파라미터를 입력하세요. 파라미터는 예제 아래에 정의되어 있습니다.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – 작업을 위한 고가용성 메타데이터를 저장할 Amazon S3 버킷입니다.

  **`Dkubernetes.jobmanager.replicas`** – `1`보다 큰 정수로 생성할 작업 관리자 포드의 수입니다.

  **`Dkubernetes.cluster-id`** – Flink 클러스터를 식별하는 고유한 ID입니다.

# Amazon EMR on EKS로 태스크 복구 및 작업 규모 조정을 위한 Flink 작업 재시작 시간 최적화
<a name="jobruns-flink-restart"></a>

태스크가 실패하거나 규모 조정 작업이 발생할 경우 Flink는 마지막으로 완료된 체크포인트의 태스크를 다시 재실행하려고 시도합니다. 체크포인트 상태의 크기와 병렬 태스크의 수에 따라 재시작 프로세스를 실행하는 데 1분 이상 소요될 수 있습니다. 프로세스를 다시 시작하는 동안에는 작업에 대한 백로그 태스크가 누적될 수 있습니다. 그렇지만 Flink는 실행 그래프의 복구 및 재시작 속도를 최적화하여 작업 안정성을 향상시킬 수 있는 방법이 몇 가지 있습니다.

이 페이지에서는 Amazon EMR Flink를 사용하여 스팟 인스턴스에서의 태스크 복구 및 조정 작업 중에 작업 재시작 시간을 개선할 수 있는 몇 가지 방법을 설명합니다. 스팟 인스턴스는 할인된 가격으로 사용할 수 있는 미사용 컴퓨팅 용량입니다. 가끔 중단되는 것을 포함하여 고유한 동작이 있으므로, Amazon EMR on EKS가 서비스 해제를 수행하고 작업을 재시작하는 방법을 비롯해 Amazon EMR on EKS에서 이러한 동작을 처리하는 방법을 이해하는 것이 중요합니다.

**Topics**
+ [태스크-로컬 복구](#flink-restart-task-local)
+ [Amazon EBS 볼륨 마운트를 통한 태스크-로컬 복구](#flink-restart-task-local-ebs)
+ [일반 로그 기반 증분 체크포인트](#flink-restart-log-check)
+ [세분화된 복구](#flink-restart-fine-grained)
+ [적응형 스케줄러의 결합된 재시작 메커니즘](#flink-restart-combined)

## 태스크-로컬 복구
<a name="flink-restart-task-local"></a>

**참고**  
태스크-로컬 복구는 EKS 6.14.0 이상의 Amazon EMR on EKS에서 Flink를 통해 지원됩니다.

Flink 체크포인트를 사용할 경우 각 태스크에서 Flink가 Amazon S3와 같은 분산 스토리지에 기록하는 상태 스냅샷을 만듭니다. 복구의 경우 태스크는 분산 스토리지를 통해 해당 상태를 복원합니다. 분산 스토리지에서는 내결함성을 제공하며 모든 노드에서 액세스가 가능하기 때문에 크기 재조정이 이뤄지는 동안 상태를 재분배할 수 있습니다.

하지만 원격 분산 저장소에는 모든 태스크에서 네트워크를 통해 원격 위치에서 해당 상태를 읽어야 한다는 단점도 있습니다. 이러한 한계로 인해 태스크 복구 또는 규모 조정 작업 중에 대규모 상태의 복구 시간이 길어질 수 있습니다.

이와 같은 긴 복구 시간 문제는 **태스크-로컬 복구를 통해 해결됩니다. 태스크에서는 체크포인트의 상태를 로컬 디스크와 같이 해당 작업에 대해 로컬인 보조 스토리지에 기록합니다. 또한 태스크는 기본 스토리지(이 경우 Amazon S3)에 상태를 저장합니다. 복구가 진행되는 동안 스케줄러는 태스크가 이전에 실행된 동일한 태스크 관리자에서 태스크를 예약하기 때문에 원격 상태 저장소에서 데이터를 읽는 대신 로컬 상태 저장소에서 복구할 수 있습니다. 자세한 내용을 알아보려면 *Apache Flink 설명서*의 [태스크 로컬 복구](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)를 참조하세요.

샘플 작업을 사용한 벤치마크 테스트 결과에 따르면 태스크-로컬 복구가 활성화된 상태에서는 복구 시간이 몇 분에서 몇 초로 단축된 것으로 확인되었습니다.

태스크-로컬 복구를 활성화하려면 `flink-conf.yaml` 파일에 다음과 같은 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Amazon EBS 볼륨 마운트를 통한 태스크-로컬 복구
<a name="flink-restart-task-local-ebs"></a>

**참고**  
Amazon EBS를 통한 태스크-로컬 복구는 Amazon EMR on EKS 6.15.0 이상의 Flink를 통해 지원됩니다.

Amazon EMR on EKS에서 Flink를 사용할 경우 태스크 로컬 복구를 위해 Amazon EBS 볼륨을 TaskManager 포드에 자동 프로비저닝할 수 있습니다. 기본 오버레이 마운트에는 10GB 볼륨이 함께 제공되어 상태가 낮은 작업에 충분합니다. 상태가 큰 작업에서는 *자동 EBS 볼륨 마운트* 옵션을 활성화할 수 있습니다. TaskManager 포드는 포드 생성 과정에서 자동으로 생성되어 마운트되며 포드 삭제 중에는 제거됩니다.

다음 단계를 따라 Amazon EMR on EKS에서 Flink용 자동 EBS 볼륨 마운트를 활성화하세요.

1. 이후 단계에서 사용할 다음 변수의 값을 내보내세요.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 클러스터에 대해 `kubeconfig` YAML 파일을 생성 또는 업데이트합니다.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Amazon EKS 클러스터에서 Amazon EBS CSI(컨테이너 스토리지 인터페이스) 드라이버에 대한 IAM 서비스 계정을 생성합니다.

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 다음 명령에 따라 Amazon EBS CSI 드라이버를 생성합니다.

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 다음 명령에 따라 Amazon EBS 스토리지 클래스를 생성합니다.

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   그런 후 다음과 같이 클래스를 적용합니다.

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm에서는 서비스 계정을 생성할 수 있는 옵션과 함께 Amazon EMR Flink Kubernetes 연산자를 설치합니다. 그러면 Flink 배포에 사용할 수 있는 `emr-containers-sa-flink`가 생성됩니다.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Flink 작업을 제출하고 태스크-로컬 복구를 위한 EBS 볼륨의 자동 프로비저닝을 활성화하려면 `flink-conf.yaml` 파일에 다음과 같은 구성을 설정하세요. 작업의 상태 크기에 맞게 크기 제한을 조정하세요. `serviceAccount`를 `emr-containers-sa-flink`으로 설정합니다. 체크포인트 간격의 값을 밀리초 단위로 지정하세요. `executionRoleArn`은 생략하세요.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Amazon EBS CSI 드라이버 플러그인을 삭제할 준비가 끝나면 다음과 같은 명령을 사용하세요.

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 일반 로그 기반 증분 체크포인트
<a name="flink-restart-log-check"></a>

**참고**  
Amazon EMR on EKS 6.14.0 이상에서 Flink를 사용하면 일반 로그 기반 증분 체크포인트가 지원됩니다.

체크포인트의 속도를 높이기 위해 일반 로그 기반 증분 체크포인트가 Flink 1.16에 추가되었습니다. 체크포인트 간격을 빠르게 하면 복구 후 다시 처리해야 하는 이벤트가 줄어들기 때문에 복구 작업이 적어지는 경우가 많습니다. 자세한 내용은 *Apache Flink 블로그*에서 [일반 로그 기반 증분 체크포인트로 체크포인트의 속도 및 안정성 강화](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)를 참조하세요.

샘플 작업을 이용하여 수행한 벤치마크 테스트에서 일반 로그 기반 증분 체크포인트를 사용하면 체크포인트 시간이 몇 분에서 몇 초로 단축된 것이 확인되었습니다.

일반 로그 기반 증분 체크포인트를 활성화하려면 `flink-conf.yaml` 파일에 다음 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 세분화된 복구
<a name="flink-restart-fine-grained"></a>

**참고**  
Amazon EMR on EKS 6.14.0 이상에서 Flink를 사용할 경우 기본 스케줄러에 대해 세분화된 복구 지원이 제공됩니다. 적응형 스케줄러에서의 세분화된 복구 지원은 Amazon EMR on EKS 6.15.0 이상에서 Flink를 통해 이용할 수 있습니다.

실행 중에 태스크가 실패하면 Flink에서는 전체 실행 그래프를 재설정하고 마지막으로 완료된 체크포인트에서 전체 재실행을 트리거합니다. 이 방식은 실패한 태스크를 재실행하는 것보다 비용이 많이 듭니다. 세분화된 복구에서는 실패한 태스크의 파이프라인으로 연결된 구성 요소만 재시작합니다. 다음 예제의 작업 그래프에는 버텍스가 5개(`A`\$1`E`) 있습니다. 버텍스 사이에 있는 모든 연결은 포인트별 분포로 파이프라인되며 작업의 `parallelism.default`는 `2`로 설정됩니다.

```
A → B → C → D → E
```

이 예시에서는 태스크가 총 10개 실행 중입니다. 첫 번째 파이프라인(`a1`\$1`e1`)은 TaskManager(`TM1`)에서 실행되고 두 번째 파이프라인(`a2`\$1`e2`)은 또 다른 TaskManager(`TM2`)에서 실행됩니다.

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

`a1 → e1` 및 `a2 → e2`라는 두 개의 구성 요소가 파이프라인으로 연결되어 있습니다. `TM1` 또는 `TM2` 중에 하나가 실패하면 실패는 TaskManager가 중이던 파이프라인에 있는 태스크 5개에만 영향을 미칩니다. 재시작 전략에 따라 영향을 받는 파이프라인 구성 요소만 시작됩니다.

세분화된 복구는 완벽히 병렬화된 Flink 작업에서만 작동합니다. `keyBy()` 또는 `redistribute()` 작업에서는 지원되지 않습니다. 자세한 내용은 *Flink 개선 제안* Jira 프로젝트의 [FLIP-1: 태스크 실패에서 세분화된 복구](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)를 참조하세요.

세분화된 복구를 활성화하려면 `flink-conf.yaml` 파일에 다음과 같은 구성을 설정하세요.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 적응형 스케줄러의 결합된 재시작 메커니즘
<a name="flink-restart-combined"></a>

**참고**  
적응형 스케줄러의 결합된 재시작 메커니즘은 Amazon EMR on EKS 6.15.0 이상의 Flink에서 지원됩니다.

적응형 스케줄러에서는 가용 슬롯을 기반으로 작업 병렬성을 조정할 수 있습니다. 이 스케줄러는 구성된 작업 병렬 처리에 적합한 가용 슬롯이 충분하지 않은 경우 병렬 처리의 수를 자동으로 줄입니다. 새 슬롯이 가용 상태가 되면 작업은 구성된 작업 병렬 처리로 다시 확장됩니다. 적응형 스케줄러는 가용 리소스가 충분하지 않은 경우 작업에서 가동 중지가 발생하는 것은 방지합니다. Flink Autoscaler에 대해 지원되는 스케줄러입니다. 이러한 이유들로 인해 적응형 스케줄러를 Amazon EMR Flink와 함께 사용하는 것이 좋습니다. 단, 적응형 스케줄러는 짧은 시간 내에 여러 번 재시작을 수행할 수 있으며, 새 리소스가 추가될 때마다 한 번씩 다시 시작됩니다. 이로 인해 작업 성능이 떨어질 수 있습니다.

Amazon EMR 6.15.0 이상에서는 Flink에 첫 번째 리소스가 추가될 때 재시작 기간을 연 다음, 구성된 기본 1분 간격까지 기다리는 적응형 스케줄러의 결합된 재시작 메커니즘이 있습니다. 이 메커니즘에서는 구성된 병렬 처리로 작업을 실행하기 위한 가용 리소스가 충분하거나 간격 제한 시간이 초과될 경우 단일 재시작을 수행합니다.

샘플 작업을 이용한 벤치마크 테스트에서는 적응형 스케줄러와 Flink Autoscaler를 사용할 경우 이 기능이 기본 동작보다 10% 더 많은 레코드를 처리하는 것이 입증되었습니다.

결합된 재시작 메커니즘을 활성화하려면 `flink-conf.yaml` 파일에 다음 구성을 설정하세요.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Amazon EMR on EKS에서 Flink를 사용한 스팟 인스턴스의 정상적인 서비스 해제
<a name="jobruns-flink-decommission"></a>

Amazon EMR on EKS가 포함된 Flink를 사용하면 작업 복구 및 작업 규모 조정 과정에서 작업 재시작 시간을 개선할 수 있습니다.

## 개요
<a name="jobruns-flink-decommission-overview"></a>

Amazon EMR on EKS 릴리스 6.15.0 이상에서는 Apache Flink를 사용하는 Amazon EMR on EKS의 스팟 인스턴스에 대한 태스크 관리자의 정상적인 서비스 해제를 지원합니다. 이 기능의 일부로 Flink가 포함된 Amazon EMR on EKS에서는 다음과 같은 기능을 제공합니다.
+ **적시 체크포인트** – Flink 스트리밍 작업은 스팟 인스턴스 중단에 대응하고, 실행 중인 작업에 대한 JIT(적시) 체크포인트를 수행하고, 이러한 스팟 인스턴스에 추가 태스크를 예약하는 것을 방지할 수 있습니다. JIT 체크포인트는 기본 및 적응형 스케줄러에서 지원됩니다.
+ **결합된 재시작 메커니즘** – 결합된 재시작 메커니즘은 작업이 대상 리소스 병렬 처리에 도달하거나 현재 구성된 기간의 종료 시점에 도달한 후 작업을 재시작하기 위해 최대한 노력합니다. 또한 복수의 스팟 인스턴스 종료로 인해 발생할 수 있는 연속된 작업 재시작을 방지할 수도 있습니다. 결합된 재시작 메커니즘은 적응형 스케줄러에서만 이용 가능합니다.

이러한 기능에는 다음과 같은 이점이 있습니다.
+ 스팟 인스턴스를 활용하여 태스크 관리자를 실행하고 클러스터 지출을 줄일 수 있습니다.
+ 스팟 인스턴스 태스크 관리자의 활성 상태가 개선되면 복원력이 높아지고 작업 예약의 효율성도 증가합니다.
+ 스팟 인스턴스 종료 후 재시작 횟수가 감소하기 때문에 Flink 작업의 가동 시간이 증가합니다.

## 정상적인 서비스 해제 작동 방식
<a name="jobruns-flink-decommission-howitworks"></a>

Apache Flink가 실행되는 Amazon EMR on EKS 클러스터에서 Amazon EMR을 프로비저닝하고 작업 관리자에는 온디맨드 노드를, 태스크 관리자에는 스팟 인스턴스 노드를 각각 지정하는 사례를 생각해 보세요. 태스크 관리자는 종료 2분 전에 중단 알림을 받습니다.

이 시나리오에서 작업 관리자는 스팟 인스턴스 중단 신호를 처리하고, 스팟 인스턴스에서 추가 작업 예약을 차단하고, 스트리밍 작업에 대한 JIT 체크포인트를 시작합니다.

그러면 작업 관리자는 현재 재시작 간격 기간에서 현재 작업 병렬 처리를 이행할 수 있을 만큼 가용 신규 리소스가 충분히 존재하는 경우에만 작업 그래프를 다시 시작합니다. 재시작 기간 간격은 스팟 인스턴스 교체 기간, 새 태스크 관리자 포드 생성 및 작업 관리자 등록을 근거로 정해집니다.

## 사전 조건
<a name="jobruns-flink-decommission-prereqs"></a>

정상적인 서비스 해제를 사용하려면 Apache Flink가 실행되는 Amazon EMR on EKS 클러스터에서 스트리밍 작업을 생성하고 실행합니다. 다음 예시와 같이 하나 이상의 스팟 인스턴스에 대해 Adaptive Scheduler 및 태스크 관리자가 예약되도록 활성화하세요. 작업 관리자에는 온디맨드 노드를 사용해야 하며, 스팟 인스턴스도 하나 이상 존재하는 경우에는 태스크 관리자에도 온디맨드 노드를 사용할 수 있습니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## 구성
<a name="jobruns-flink-decommission-config"></a>

이 섹션에서는 서비스 해제 요구 사항을 위해 지정할 수 있는 구성들을 대부분 설명합니다.


| Key(키) | 설명 | 기본값  | 허용되는 값 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  태스크 관리자의 정상적인 서비스 해제를 활성화합니다.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  적응형 스케줄러에서 결합된 재시작 메커니즘을 활성화합니다.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  작업에 대해 병합된 재시작을 수행하는 결합된 재시작 기간 간격입니다. 단위가 없는 정수는 밀리초로 해석합니다.  |  1m  |  예: 30, 60s, 3m, 1h  | 

# Flink 애플리케이션에 대한 Autoscaler 사용
<a name="jobruns-flink-autoscaler"></a>

운영자의 Autoscaler를 사용하면 Flink 작업에서 지표를 수집하고 작업 버텍스 수준에서 병렬 처리를 자동으로 조정하여 역압을 완화할 수 있습니다. 다음은 구성에 대한 예제입니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

이 구성은 Amazon EMR의 최신 릴리스에 대한 기본값을 사용합니다. 다른 버전을 사용하는 경우 값이 다를 수 있습니다.

**참고**  
Amazon EMR 7.2.0부터는 구성에 `kubernetes.operator` 접두사를 포함하지 않아도 됩니다. 7.1.0 이하를 사용하는 경우 각 구성 전에 접두사를 사용해야 합니다. 예를 들어, `kubernetes.operator.job.autoscaler.scaling.enabled`를 지정해야 합니다.

다음은 Autoscaler의 구성 옵션입니다.
+ `job.autoscaler.scaling.enabled` – 오토스케일러에서 버텍스 조정 실행을 활성화할지 여부를 지정합니다. 기본값은 `true`입니다. 이 구성을 비활성화하면 오토스케일러는 지표만 수집하고 각 버텍스에 대해 제안된 병렬 처리를 평가하지만 작업을 업그레이드하지는 않습니다.
+ `job.autoscaler.stabilization.interval` - 새로운 조정이 실행되지 않는 안정화 기간. 기본값은 5분입니다.
+ `job.autoscaler.metrics.window` - 조정 지표 집계 기간. 기간이 길수록 더 원활하고 안정적이지만 갑작스러운 로드 변경에 대응하는 경우 Autoscaler 속도가 느려질 수 있습니다. 기본값은 15분입니다. 3\$160분의 값을 사용하여 실험해 보는 것이 좋습니다.
+ `job.autoscaler.target.utilization` - 안정적인 작업 성능을 제공하고 로드 변동을 위한 약간의 버퍼를 제공하기 위한 목표 버텍스 사용률. 기본값은 `0.7`로, 작업 버텍스의 사용률/로드를 70%로 설정합니다.
+ `job.autoscaler.target.utilization.boundary` - 로드 변동에서 즉각적인 조정을 피하기 위해 추가 버퍼 역할을 하는 목표 버텍스 사용률 경계. 기본값은 `0.3`이며, 조정 작업을 트리거하기 전에 목표 사용률과 30%의 편차가 허용됨을 의미합니다.
+ `ob.autoscaler.restart.time` - 애플리케이션을 다시 시작하는 데 걸리는 예상 시간. 기본값은 5분입니다.
+ `job.autoscaler.catch-up.duration` - 예상 캐치업 시간으로, 조정 작업이 완료된 후 모든 백로그를 완전히 처리합니다. 기본값은 5분입니다. 캐치업 기간을 줄임으로써 Autoscaler는 조정 작업을 위한 추가 용량을 예약해야 합니다.
+ `pipeline.max-parallelism` - Autoscaler에서 사용할 수 있는 최대 병렬 처리. 이 한도가 Flink 구성 또는 각 운영자에 구성된 최대 병렬 처리보다 더 높은 경우 Autoscaler는 이 한도를 무시합니다. 기본값은 -1입니다. Autoscaler는 최대 병렬 처리의 나눗수로 병렬 처리를 계산하므로, Flink에서 제공하는 기본값을 사용하는 대신, 나눗수가 많은 최대 병렬 처리를 선택하는 것이 좋습니다. 이 구성에서는 120, 180, 240, 360, 720 등과 같이 60의 배수를 사용하는 것이 좋습니다.

구성 참조 페이지에 대한 자세한 내용은 [Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)을 참조하세요.

# 오토스케일러 파라미터 자동 조정
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

이 섹션에서는 다양한 Amazon EMR 버전에 대한 자동 조정 동작을 설명합니다. 또한 다양한 오토 스케일링 구성도 자세히 설명합니다.

**참고**  
Amazon EMR 7.2.0 이상은 오픈 소스 구성 `job.autoscaler.restart.time-tracking.enabled`를 사용하여 **재조정 시간 예측**을 활성화합니다. 재조정 시간 예측은 Amazon EMR 자동 조정과 동일한 기능을 제공하므로, 재시작 시간에 경험적 값을 수동으로 할당하지 않아도 됩니다.  
Amazon EMR 7.1.0 이하를 사용하는 경우에도 Amazon EMR 자동 조정을 계속 사용할 수 있습니다.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 이상은 오토 스케일링 결정을 적용하는 데 필요한 실제 재시작 시간을 측정합니다. 릴리스 7.1.0 이하에서는 `job.autoscaler.restart.time` 구성을 사용하여 예상 최대 재시작 시간을 수동으로 구성해야 했습니다. `job.autoscaler.restart.time-tracking.enabled` 구성을 사용하면 첫 번째 조정에 대한 재시작 시간만 입력하면 됩니다. 이후 작업자는 실제 재시작 시간을 기록하고 후속 조정에 사용합니다.

이 추적을 활성화하려면 다음 명령을 사용합니다.

```
job.autoscaler.restart.time-tracking.enabled: true
```

다음은 재조정 시간 예측을 위한 관련 구성입니다.


| 구성 | 필수 | 기본값 | 설명 | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 아니요 | False | Flink 오토스케일러가 시간 경과에 따라 구성을 자동으로 조정하여 조정 결정을 최적화해야 하는지 여부를 나타냅니다. 오토스케일러는 오토스케일러 파라미터 restart.time만 자동 조정할 수 있습니다. | 
| job.autoscaler.restart.time | 아니요 | 5분 | 연산자가 이전 조정에서 실제 재시작 시간을 확인할 수 있을 때까지 Amazon EMR on EKS에서 사용하는 예상 재시작 시간. | 
| job.autoscaler.restart.time-tracking.limit | 아니요 | 15분 | job.autoscaler.restart.time-tracking.enabled가 true로 설정된 경우 관찰된 최대 재시작 시간. | 

다음은 재조정 시간 예측을 시도하는 데 사용할 수 있는 배포 사양의 예입니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

역압을 시뮬레이션하려면 다음 배포 사양을 사용합니다.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

다음 Python 스크립트를 S3 버킷에 업로드합니다.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

재조정 시간 예측이 작동하는지 확인하려면 Flink 연산자의 `DEBUG` 수준 로깅이 활성화되어 있는지 확인합니다. 아래 예제에서는 헬름 차트 파일(`values.yaml`)을 업데이트하는 방법을 보여줍니다. 그런 다음, 업데이트된 헬름 차트를 다시 설치하고 Flink 작업을 다시 실행합니다.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

리더 포드의 이름을 가져옵니다.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

다음 명령을 실행하여 지표 평가에 사용된 실제 재시작 시간을 가져옵니다.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

다음과 비슷한 로그가 표시됩니다. 첫 번째 조정에서만 ` job.autoscaler.restart.time`을 사용합니다. 후속 조정에서는 관찰된 재시작 시간을 사용합니다.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

오픈 소스 기본 제공 Flink 오토스케일러는 다양한 지표를 사용하여 최적의 조정 결정을 내립니다. 그러나 계산에 사용하는 기본값은 대부분의 워크로드에 적용 가능한 값이지만, 주어진 작업에 적합하지 않을 수 있습니다. Flink 연산자의 Amazon EMR on EKS 버전에 추가된 자동 조정 기능은 캡처된 특정 지표에서 관찰된 과거 추세를 검토한 다음, 해당 작업에 맞게 조정된 가장 최적의 값을 계산하려고 시도합니다.


| 구성 | 필수 | 기본값 | 설명 | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 아니요 | False | Flink 오토스케일러가 시간 경과에 따라 구성을 자동으로 조정하여 오토스케일러 조정 결정을 최적화해야 하는지 여부를 나타냅니다. 현재 오토스케일러는 오토스케일러 파라미터 restart.time만 자동 조정할 수 있습니다. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 아니요 | 3 | 오토스케일러가 Amazon EMR on EKS 지표 구성 맵에 보관하는 Amazon EMR on EKS의 기록 지표 수를 나타냅니다. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 아니요 | 3 | 주어진 작업의 평균 재시작 시간 계산을 시작하기 전에 오토스케일러가 수행하는 재시작 횟수를 나타냅니다. | 

자동 조정을 활성화하려면 다음을 완료해야 합니다.
+ `kubernetes.operator.job.autoscaler.autotune.enable:`를 `true`로 설정합니다.
+ `metrics.job.status.enable:`를 `TOTAL_TIME`로 설정합니다.
+ 자동 조정을 활성화하기 위해 [Flink 애플리케이션에 대해 오토스케이러 사용](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) 설정을 따랐습니다.

다음은 자동 조정을 시도하는 데 사용할 수 있는 배포 사양에 대한 예제입니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

역압을 시뮬레이션하려면 다음 배포 사양을 사용합니다.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

다음 Python 스크립트를 S3 버킷에 업로드합니다.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

자동 튜너가 작동하는지 확인하려면 다음 명령을 사용합니다. Flink 연산자에 대한 자체 리더 포드 정보를 사용해야 합니다.

먼저 리더 포드의 이름을 가져옵니다.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

리더 포드의 이름을 지정한 후 다음 명령을 실행할 수 있습니다.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

다음과 비슷한 로그가 표시됩니다.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Amazon EMR on EKS에서 Flink 작업에 대한 유지 관리 및 문제 해결
<a name="jobruns-flink-troubleshooting"></a>

다음 섹션에서는 장기 실행 중인 Flink 작업을 유지 관리하는 방법을 개략적으로 살펴보고 자주 발생하는 Flink 관련 문제 몇 가지를 해결하는 방법에 대한 지침을 제공합니다.

# Flink 애플리케이션 유지 관리
<a name="jobruns-flink-maintain"></a>

**Topics**
+ [업그레이드 모드](#jobruns-flink-upgrademode)

Flink 애플리케이션은 일반적으로 몇 주, 몇 달 또는 몇 년 등 장기간 실행되도록 설계되었습니다. 모든 장기 실행 서비스와 마찬가지로 Flink 스트리밍 애플리케이션에도 유지 관리가 필요합니다. 유지 관리는 버그 수정, 개선 및 이후 버전의 Flink 클러스터로의 마이그레이션을 포함합니다.

`FlinkDeployment` 및 `FlinkSessionJob` 리소스에 대한 사양이 변경될 경우 실행 중인 애플리케이션을 업그레이드해야 합니다. 이 작업을 수행하기 위해 운영자는 (이미 일시 중단된 경우를 제외하고) 실행 중인 작업을 중지하고 해당 작업을 최신 사양을 이용해 다시 배포하고, 상태 저장 애플리케이션의 경우에는 이전 실행의 상태를 이용해 다시 배포합니다.

사용자는 상태 저장 애플리케이션이 중지되고 `JobSpec`의 `upgradeMode` 설정을 이용해 복원될 때 상태를 관리하는 방법을 제어합니다.

## 업그레이드 모드
<a name="jobruns-flink-upgrademode"></a>

선택적 도입

**상태 비저장**  
빈 상태에서의 상태 비저장 애플리케이션 업그레이드입니다.

**마지막 상태**  
모든 애플리케이션 상태(실패한 작업 포함)의 간편 업그레이드에서는 항상 가장 최근에 성공한 체크포인트를 사용하기 때문에 정상 작업이 필요하지 않습니다. HA 메타데이터가 손실된 경우에는 수동으로 복구해야 할 수 있습니다. 최신 체크포인트를 선택할 때 작업이 대체될 수 있는 시간을 제한하기 위해 `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`를 구성할 수 있습니다. 체크포인트가 구성된 값보다 오래된 경우 정상 작업에 대한 세이브포인트가 대신 사용됩니다. 세션 모드에서는 이 작업이 지원되지 않습니다.

**세이브포인트**  
업그레이드를 위해 세이브포인트를 사용하여 최대한의 안전성과 백업/포크 포인트로 사용될 가능성을 제공합니다. 업그레이드 프로세스가 진행되는 동안 세이브포인트가 생성됩니다. 세이브포인트가 생성되도록 하려면 Flink 작업이 실행 중이어야 합니다. 작업이 비정상 상태인 경우 마지막 체크포인트가 사용됩니다(kubernetes.operator.job.upgrade.last-state-fallback.enabled가 false로 설정된 경우는 제외). 마지막 체크포인트를 사용할 수 없다면 작업 업그레이드가 실패합니다.

# 문제 해결
<a name="jobruns-flink-troubleshoot"></a>

이 섹션에서는 Amazon EMR on EKS의 문제를 해결하는 방법을 설명합니다. Amazon EMR과 관련된 일반적인 문제를 해결하는 방법에 대한 자세한 내용은 *Amazon EMR 관리 안내서*에서 [클러스터 문제 해결](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html)을 참조하세요.
+ [PersistentVolumeClaims(PVC)를 사용하는 작업 문제 해결](permissions-for-pvc.md)
+ [Amazon EMR on EKS 수직 자동 조정 문제 해결](troubleshooting-vas.md)
+ [Amazon EMR on EKS Spark 운영자 문제 해결](troubleshooting-sparkop.md)

## Amazon EMR on EKS에서 Apache Flink 문제 해결
<a name="jobruns-flink-troubleshooting-apache-flink"></a>

### 차트 Helm 설치 시 리소스 매핑을 찾을 수 없음
<a name="w2aac21c21b7b7b3"></a>

차트 Helm 설치 시 다음과 같은 오류 메시지가 나타날 수 있다.

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

이 오류를 해결하려면 웹후크 구성 요소 추가를 활성화하도록 cert-manager를 설치합니다. 사용하는 각 Amazon EKS 클러스터에 cert-manager를 설치해야 합니다.

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### AWS 서비스 액세스 거부 오류
<a name="jobruns-flink-troubleshooting-access-denied"></a>

*access denied* 오류가 표시되는 경우 차트 Helm `values.yaml` 파일에서 `operatorExecutionRoleArn`에 대한 IAM 역할에 올바른 권한이 있는지 확인합니다. 또한 `FlinkDeployment` 사양에서 `executionRoleArn` 아래 IAM 역할에 올바른 권한이 있는지 확인합니다.

### `FlinkDeployment`가 멈춤
<a name="jobruns-flink-troubleshooting-stuck"></a>

`FlinkDeployment`가 중단된 상태로 멈추면 다음 단계에 따라 배포를 강제로 삭제합니다.

1. 배포 실행을 편집합니다.

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. 이 파이널라이저를 제거합니다.

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. 배포를 삭제합니다.

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### 옵트인에서 Flink 애플리케이션을 실행할 때 s3a AWSBadRequestException 문제 AWS 리전
<a name="jobruns-flink-troubleshooting-optin-region"></a>

[옵트인 AWS 리전](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)에서 Flink 애플리케이션을 실행하는 경우 다음 오류가 표시될 수 있습니다.

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

이러한 오류를 해결하려면 `FlinkDeployment` 정의 파일에서 다음 구성을 사용합니다.

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

또한 SDKv2 자격 증명 제공업체를 사용하는 것이 좋습니다.

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

SDKv1 자격 증명 제공업체를 사용하려면 SDK가 옵트인 리전을 지원하는지 확인합니다. 자세한 내용은 [aws-sdk-java GitHub repository](https://github.com/aws/aws-sdk-java)를 참조하세요.

옵트인 리전에서 Flink SQL 문을 실행할 때 `S3 AWSBadRequestException`이 표시되면 Flink 구성 사양에서 `fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME` 구성을 설정해야 합니다.

### CN 리전에서 Flink 세션 작업을 실행하는 경우 S3A AWSBadRequestException
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Amazon EMR 릴리스 6.15.0\$17.2.0의 경우 CN 리전에서 Flink 세션 작업을 실행할 때 다음과 같은 오류 메시지가 표시될 수 있습니다. 여기에는 중국(베이징) 및 중국(닝샤)이 포함됩니다.

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

이 문제를 인지하고 있습니다. 팀은 이러한 모든 릴리스 버전에 대해 flink 연산자를 패치하는 작업을 하고 있습니다. 그러나 패치를 완료하기 전에 이 오류를 해결하려면 Flink 연산자 헬름 차트를 다운로드하고, 압축 해제하며(압축된 파일 추출), 헬름 차트에서 구성을 변경해야 합니다.

특정 단계는 다음과 같습니다.

1. 특히 디렉터리를 헬름 차트의 로컬 폴더로 변경하고, 다음 명령줄을 실행하여 헬름 차트를 가져와 압축 해제(추출)합니다.

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. 헬름 차트 폴더로 이동하여 `templates/flink-operator.yaml` 파일을 찾습니다.

1. `flink-operator-config` ConfigMap을 찾아 `flink-conf.yaml`의 다음 `fs.s3a.endpoint.region` 구성을 추가합니다. 예제:

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. 로컬 헬름 차트를 설치하고 작업을 실행합니다.

# Apache Flink가 포함된 Amazon EMR on EKS에 대해 지원되는 릴리스
<a name="jobruns-flink-security-release-versions"></a>

Apache Flink는 다음 Amazon EMR on EKS 릴리스에서 사용할 수 있습니다. 사용 가능한 모든 릴리스에 대한 자세한 내용은 [Amazon EMR on EKS 릴리스](emr-eks-releases.md) 섹션을 참조하세요.


| 릴리스 레이블 | Java | Flink | Flink 운영자 | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 