

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Flink에서 고가용성 및 작업 복원력을 지원하는 방법
<a name="jobruns-flink-resiliency"></a>

다음 섹션에서는 Flink 작업의 신뢰성과 가용성을 높이는 방법을 간략하게 설명합니다. Flink 고가용성 및 장애가 발생할 경우 다양한 복구 기능과 같은 내장 기능을 통해 이를 수행합니다.

**Topics**
+ [Flink 운영자 및 Flink 애플리케이션을 위한 고가용성(HA) 사용](jobruns-flink-using-ha.md)
+ [Amazon EMR on EKS로 태스크 복구 및 작업 규모 조정을 위한 Flink 작업 재시작 시간 최적화](jobruns-flink-restart.md)
+ [Amazon EMR on EKS에서 Flink를 사용한 스팟 인스턴스의 정상적인 서비스 해제](jobruns-flink-decommission.md)

# Flink 운영자 및 Flink 애플리케이션을 위한 고가용성(HA) 사용
<a name="jobruns-flink-using-ha"></a>

이 주제에서는 고가용성을 구성하는 방법 및 몇 가지 사용 사례에 대한 작동 방식을 설명합니다. 여기에는 작업 관리자를 사용하는 경우와 Flink 네이티브 kubernetes를 사용하는 경우가 포함됩니다.

## Flink 운영자 고가용성
<a name="jobruns-flink-ha-operator"></a>

Flink 운영자의 *고가용성*을 활성화하여 장애 발생 시 대기 중인 Flink 운영자로 장애 조치하여 운영자 제어 루프에서 가동 중단을 최소화할 수 있습니다. 고가용성은 기본적으로 활성화되어 있으며 시작 운영자 복제본의 기본 수는 2입니다. 차트 Helm에 대해 `values.yaml` 파일의 복제본 필드를 구성할 수 있습니다.

다음 필드를 사용자 지정 가능합니다.
+ `replicas`(선택 사항, 기본값: 2): 이 숫자를 1보다 크게 설정하면 다른 대기 운영자가 생성되고 작업을 더 빠르게 복구할 수 있습니다.
+ `highAvailabilityEnabled`(선택 사항, 기본값: true): HA 활성화 여부를 제어합니다. 이 파라미터를 true로 지정하면 다중 AZ 배포를 지원하고 올바른 `flink-conf.yaml` 파라미터를 설정할 수 있습니다.

`values.yaml` 파일에서 다음 구성을 설정하여 운영자의 HA를 비활성화할 수 있습니다.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**다중 AZ 배포**

여러 가용 영역에서 운영자 포드를 생성합니다. 이는 약한 제약 조건이며, 다른 AZ에 충분한 리소스가 없는 경우 운영자 포드가 동일한 AZ에서 예약됩니다.

**리더 복제본 결정**

 HA가 활성화된 경우 복제본은 리스 기능을 사용하여 어떤 JM이 리더인지 결정하고 리더 선택에 K8s Lease를 사용합니다. 리스 기능을 설명하고 .Spec.Holder Identity 필드를 확인하여 현재 리더를 결정할 수 있습니다.

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3 상호 작용**

**액세스 보안 인증 구성**

S3 버킷에 액세스할 수 있는 적절한 IAM 권한으로 IRSA를 구성했는지 확인하세요.

**S3 애플리케이션 모드에서 작업 jar 가져오기**

Flink 운영자는 S3에서 애플리케이션 jar 가져오기도 지원합니다. FlinkDeployment 사양에서 JarURI의 S3 위치를 제공하면 됩니다.

이 기능을 사용하여 PyFlink 스크립트와 같은 다른 아티팩트를 다운로드할 수도 있습니다. 결과 Python 스크립트는 `/opt/flink/usrlib/` 경로 아래에 배치됩니다.

다음 예제에서는 PyFlink 작업에 대해 이 기능을 사용하는 방법을 보여줍니다. jarURI 및 args 필드를 기록합니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink S3 커넥터**

Flink는 두 개의 S3 커넥터(아래 목록 참조)와 함께 제공됩니다. 다음 섹션에서는 어떤 커넥터를 언제 사용해야 하는지를 설명합니다.

**검사: Presto S3 커넥터**
+ S3 스키마를 s3p://로 설정
+ s3에 대한 검사에 사용할 권장 커넥터. 자세한 내용은 Apache Flink 설명서의 [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific)을 참조하세요.

FlinkDeployment 사양 예제:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**S3: Hadoop S3 커넥터에 대한 읽기 및 쓰기**
+ S3 스키마를 `s3://` 또는 `s3a://`로 설정
+ S3에서 파일을 읽고 쓰는 데 권장되는 커넥터([Flinks 파일 시스템 인터페이스](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)를 구현하는 S3 커넥터만 해당).
+ 기본적으로 `flink-conf.yaml` 파일에서 `fs.s3a.aws.credentials.provider`를 설정합니다(`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`). 기본값(`flink-conf`)을 완전히 재정의하고 S3와 상호 작용하는 경우 이 제공업체를 사용해야 합니다.

FlinkDeployment 사양 예제

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink 작업 관리자
<a name="jobruns-flink-ha-manager"></a>

Flink 배포용 고가용성(HA)을 통해 일시적인 오류가 발생하여 JobManager가 충돌하더라도 작업을 계속 진행할 수 있습니다. 작업은 HA가 활성화된 상태에서 마지막으로 성공한 체크포인트부터 다시 시작됩니다. HA를 활성화하지 않으면 Kubernetes는 JobManager를 다시 시작하지만 작업은 새 작업으로 시작되고 진행 상황은 사라집니다. HA를 구성한 후에 JobManager에서 일시적인 오류가 발생할 경우 참조할 수 있도록 HA 메타데이터를 영구 스토리지에 저장한 다음, 마지막으로 성공한 체크포인트에서 작업을 재개하도록 Kubernetes에 지시할 수 있습니다.

Flink 작업에 대해 HA는 기본적으로 활성화되어 있습니다(복제본 수는 2로 설정되어 있으며, 이 경우 HA 메타데이터가 지속되려면 S3 스토리지 위치를 제공해야 함).

**HA 구성**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

다음은 작업 관리자(.spec.jobManager에서 정의됨)에서 위 HA 구성에 대한 설명입니다.
+ `highAvailabilityEnabled`(선택 사항, 기본값: true): HA를 활성화하지 않고 제공된 HA 구성을 사용하지 않으려면 이 옵션을 `false `로 설정합니다. 여전히 '복제본' 필드를 조작하여 HA를 수동으로 구성할 수 있습니다.
+ `replicas`(선택 사항, 기본값: 2): 이 숫자를 1보다 크게 설정하면 다른 대기 JobManagers가 생성되고 작업을 더 빠르게 복구할 수 있습니다. HA를 비활성화하는 경우 복제본 수를 1로 설정해야 합니다. 그렇지 않으면 검증 오류가 계속 발생합니다(HA가 활성화되지 않은 경우 복제본 1개만 지원됨).
+ `storageDir`(필수): 기본적으로 복제본 수를 2로 사용하기 때문에 영구 storageDir을 제공해야 합니다. 현재 이 필드에는 스토리지 위치로 S3 경로만 허용합니다.

**포드 지역성**

 또한 HA를 활성화하는 경우 동일한 AZ에 파드를 배치하려고 시도하므로 성능이 개선됩니다(동일한 AZ에 포드를 배치하여 네트워크 지연 시간 감소). 이는 최대한의 원칙이 적용되는 프로세스입니다. 즉, 대부분의 포드가 예약된 AZ에 충분한 리소스가 없는 경우 나머지 포드는 여전히 예약되지만 결국 이 AZ 외부의 노드에 배치될 수 있습니다.

**리더 복제본 결정**

HA가 활성화된 경우 복제본은 리스 기능을 통해 어떤 JM이 리더인지 결정하고 이 메타데이터를 저장할 데이터 스토어로 K8s Configmap을 사용합니다. 리더를 결정하려면 Configmap의 콘텐츠를 살펴보고 데이터 아래에 있는 `org.apache.flink.k8s.leader.restserver` 키를 확인하여 IP 주소가 있는 K8s 포드를 찾을 수 있습니다. 다음과 같은 bash 명령을 사용할 수 있습니다.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink 작업 - 네이티브 Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 이상은 Amazon EKS 클러스터에서 고가용성 모드의 Flink 애플리케이션을 실행하기 위한 Flink 네이티브 Kubernetes를 지원합니다.

**참고**  
Flink 작업을 제출할 때 고가용성 메타데이터를 저장할 Amazon S3 버킷을 생성해야 합니다. 이 기능을 사용하고 싶지 않은 경우 비활성화할 수 있습니다. 기본적으로 활성화됩니다.

Flink 고가용성 특성을 활성화하려면 [`run-application` CLI 명령](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application)을 실행할 때 다음 Flink 파라미터를 입력하세요. 파라미터는 예제 아래에 정의되어 있습니다.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – 작업을 위한 고가용성 메타데이터를 저장할 Amazon S3 버킷입니다.

  **`Dkubernetes.jobmanager.replicas`** – `1`보다 큰 정수로 생성할 작업 관리자 포드의 수입니다.

  **`Dkubernetes.cluster-id`** – Flink 클러스터를 식별하는 고유한 ID입니다.

# Amazon EMR on EKS로 태스크 복구 및 작업 규모 조정을 위한 Flink 작업 재시작 시간 최적화
<a name="jobruns-flink-restart"></a>

태스크가 실패하거나 규모 조정 작업이 발생할 경우 Flink는 마지막으로 완료된 체크포인트의 태스크를 다시 재실행하려고 시도합니다. 체크포인트 상태의 크기와 병렬 태스크의 수에 따라 재시작 프로세스를 실행하는 데 1분 이상 소요될 수 있습니다. 프로세스를 다시 시작하는 동안에는 작업에 대한 백로그 태스크가 누적될 수 있습니다. 그렇지만 Flink는 실행 그래프의 복구 및 재시작 속도를 최적화하여 작업 안정성을 향상시킬 수 있는 방법이 몇 가지 있습니다.

이 페이지에서는 Amazon EMR Flink를 사용하여 스팟 인스턴스에서의 태스크 복구 및 조정 작업 중에 작업 재시작 시간을 개선할 수 있는 몇 가지 방법을 설명합니다. 스팟 인스턴스는 할인된 가격으로 사용할 수 있는 미사용 컴퓨팅 용량입니다. 가끔 중단되는 것을 포함하여 고유한 동작이 있으므로, Amazon EMR on EKS가 서비스 해제를 수행하고 작업을 재시작하는 방법을 비롯해 Amazon EMR on EKS에서 이러한 동작을 처리하는 방법을 이해하는 것이 중요합니다.

**Topics**
+ [태스크-로컬 복구](#flink-restart-task-local)
+ [Amazon EBS 볼륨 마운트를 통한 태스크-로컬 복구](#flink-restart-task-local-ebs)
+ [일반 로그 기반 증분 체크포인트](#flink-restart-log-check)
+ [세분화된 복구](#flink-restart-fine-grained)
+ [적응형 스케줄러의 결합된 재시작 메커니즘](#flink-restart-combined)

## 태스크-로컬 복구
<a name="flink-restart-task-local"></a>

**참고**  
태스크-로컬 복구는 EKS 6.14.0 이상의 Amazon EMR on EKS에서 Flink를 통해 지원됩니다.

Flink 체크포인트를 사용할 경우 각 태스크에서 Flink가 Amazon S3와 같은 분산 스토리지에 기록하는 상태 스냅샷을 만듭니다. 복구의 경우 태스크는 분산 스토리지를 통해 해당 상태를 복원합니다. 분산 스토리지에서는 내결함성을 제공하며 모든 노드에서 액세스가 가능하기 때문에 크기 재조정이 이뤄지는 동안 상태를 재분배할 수 있습니다.

하지만 원격 분산 저장소에는 모든 태스크에서 네트워크를 통해 원격 위치에서 해당 상태를 읽어야 한다는 단점도 있습니다. 이러한 한계로 인해 태스크 복구 또는 규모 조정 작업 중에 대규모 상태의 복구 시간이 길어질 수 있습니다.

이와 같은 긴 복구 시간 문제는 **태스크-로컬 복구를 통해 해결됩니다. 태스크에서는 체크포인트의 상태를 로컬 디스크와 같이 해당 작업에 대해 로컬인 보조 스토리지에 기록합니다. 또한 태스크는 기본 스토리지(이 경우 Amazon S3)에 상태를 저장합니다. 복구가 진행되는 동안 스케줄러는 태스크가 이전에 실행된 동일한 태스크 관리자에서 태스크를 예약하기 때문에 원격 상태 저장소에서 데이터를 읽는 대신 로컬 상태 저장소에서 복구할 수 있습니다. 자세한 내용을 알아보려면 *Apache Flink 설명서*의 [태스크 로컬 복구](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery)를 참조하세요.

샘플 작업을 사용한 벤치마크 테스트 결과에 따르면 태스크-로컬 복구가 활성화된 상태에서는 복구 시간이 몇 분에서 몇 초로 단축된 것으로 확인되었습니다.

태스크-로컬 복구를 활성화하려면 `flink-conf.yaml` 파일에 다음과 같은 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Amazon EBS 볼륨 마운트를 통한 태스크-로컬 복구
<a name="flink-restart-task-local-ebs"></a>

**참고**  
Amazon EBS를 통한 태스크-로컬 복구는 Amazon EMR on EKS 6.15.0 이상의 Flink를 통해 지원됩니다.

Amazon EMR on EKS에서 Flink를 사용할 경우 태스크 로컬 복구를 위해 Amazon EBS 볼륨을 TaskManager 포드에 자동 프로비저닝할 수 있습니다. 기본 오버레이 마운트에는 10GB 볼륨이 함께 제공되어 상태가 낮은 작업에 충분합니다. 상태가 큰 작업에서는 *자동 EBS 볼륨 마운트* 옵션을 활성화할 수 있습니다. TaskManager 포드는 포드 생성 과정에서 자동으로 생성되어 마운트되며 포드 삭제 중에는 제거됩니다.

다음 단계를 따라 Amazon EMR on EKS에서 Flink용 자동 EBS 볼륨 마운트를 활성화하세요.

1. 이후 단계에서 사용할 다음 변수의 값을 내보내세요.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. 클러스터에 대해 `kubeconfig` YAML 파일을 생성 또는 업데이트합니다.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Amazon EKS 클러스터에서 Amazon EBS CSI(컨테이너 스토리지 인터페이스) 드라이버에 대한 IAM 서비스 계정을 생성합니다.

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. 다음 명령에 따라 Amazon EBS CSI 드라이버를 생성합니다.

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. 다음 명령에 따라 Amazon EBS 스토리지 클래스를 생성합니다.

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   그런 후 다음과 같이 클래스를 적용합니다.

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm에서는 서비스 계정을 생성할 수 있는 옵션과 함께 Amazon EMR Flink Kubernetes 연산자를 설치합니다. 그러면 Flink 배포에 사용할 수 있는 `emr-containers-sa-flink`가 생성됩니다.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Flink 작업을 제출하고 태스크-로컬 복구를 위한 EBS 볼륨의 자동 프로비저닝을 활성화하려면 `flink-conf.yaml` 파일에 다음과 같은 구성을 설정하세요. 작업의 상태 크기에 맞게 크기 제한을 조정하세요. `serviceAccount`를 `emr-containers-sa-flink`으로 설정합니다. 체크포인트 간격의 값을 밀리초 단위로 지정하세요. `executionRoleArn`은 생략하세요.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Amazon EBS CSI 드라이버 플러그인을 삭제할 준비가 끝나면 다음과 같은 명령을 사용하세요.

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## 일반 로그 기반 증분 체크포인트
<a name="flink-restart-log-check"></a>

**참고**  
Amazon EMR on EKS 6.14.0 이상에서 Flink를 사용하면 일반 로그 기반 증분 체크포인트가 지원됩니다.

체크포인트의 속도를 높이기 위해 일반 로그 기반 증분 체크포인트가 Flink 1.16에 추가되었습니다. 체크포인트 간격을 빠르게 하면 복구 후 다시 처리해야 하는 이벤트가 줄어들기 때문에 복구 작업이 적어지는 경우가 많습니다. 자세한 내용은 *Apache Flink 블로그*에서 [일반 로그 기반 증분 체크포인트로 체크포인트의 속도 및 안정성 강화](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/)를 참조하세요.

샘플 작업을 이용하여 수행한 벤치마크 테스트에서 일반 로그 기반 증분 체크포인트를 사용하면 체크포인트 시간이 몇 분에서 몇 초로 단축된 것이 확인되었습니다.

일반 로그 기반 증분 체크포인트를 활성화하려면 `flink-conf.yaml` 파일에 다음 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## 세분화된 복구
<a name="flink-restart-fine-grained"></a>

**참고**  
Amazon EMR on EKS 6.14.0 이상에서 Flink를 사용할 경우 기본 스케줄러에 대해 세분화된 복구 지원이 제공됩니다. 적응형 스케줄러에서의 세분화된 복구 지원은 Amazon EMR on EKS 6.15.0 이상에서 Flink를 통해 이용할 수 있습니다.

실행 중에 태스크가 실패하면 Flink에서는 전체 실행 그래프를 재설정하고 마지막으로 완료된 체크포인트에서 전체 재실행을 트리거합니다. 이 방식은 실패한 태스크를 재실행하는 것보다 비용이 많이 듭니다. 세분화된 복구에서는 실패한 태스크의 파이프라인으로 연결된 구성 요소만 재시작합니다. 다음 예제의 작업 그래프에는 버텍스가 5개(`A`\$1`E`) 있습니다. 버텍스 사이에 있는 모든 연결은 포인트별 분포로 파이프라인되며 작업의 `parallelism.default`는 `2`로 설정됩니다.

```
A → B → C → D → E
```

이 예시에서는 태스크가 총 10개 실행 중입니다. 첫 번째 파이프라인(`a1`\$1`e1`)은 TaskManager(`TM1`)에서 실행되고 두 번째 파이프라인(`a2`\$1`e2`)은 또 다른 TaskManager(`TM2`)에서 실행됩니다.

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

`a1 → e1` 및 `a2 → e2`라는 두 개의 구성 요소가 파이프라인으로 연결되어 있습니다. `TM1` 또는 `TM2` 중에 하나가 실패하면 실패는 TaskManager가 중이던 파이프라인에 있는 태스크 5개에만 영향을 미칩니다. 재시작 전략에 따라 영향을 받는 파이프라인 구성 요소만 시작됩니다.

세분화된 복구는 완벽히 병렬화된 Flink 작업에서만 작동합니다. `keyBy()` 또는 `redistribute()` 작업에서는 지원되지 않습니다. 자세한 내용은 *Flink 개선 제안* Jira 프로젝트의 [FLIP-1: 태스크 실패에서 세분화된 복구](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures)를 참조하세요.

세분화된 복구를 활성화하려면 `flink-conf.yaml` 파일에 다음과 같은 구성을 설정하세요.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## 적응형 스케줄러의 결합된 재시작 메커니즘
<a name="flink-restart-combined"></a>

**참고**  
적응형 스케줄러의 결합된 재시작 메커니즘은 Amazon EMR on EKS 6.15.0 이상의 Flink에서 지원됩니다.

적응형 스케줄러에서는 가용 슬롯을 기반으로 작업 병렬성을 조정할 수 있습니다. 이 스케줄러는 구성된 작업 병렬 처리에 적합한 가용 슬롯이 충분하지 않은 경우 병렬 처리의 수를 자동으로 줄입니다. 새 슬롯이 가용 상태가 되면 작업은 구성된 작업 병렬 처리로 다시 확장됩니다. 적응형 스케줄러는 가용 리소스가 충분하지 않은 경우 작업에서 가동 중지가 발생하는 것은 방지합니다. Flink Autoscaler에 대해 지원되는 스케줄러입니다. 이러한 이유들로 인해 적응형 스케줄러를 Amazon EMR Flink와 함께 사용하는 것이 좋습니다. 단, 적응형 스케줄러는 짧은 시간 내에 여러 번 재시작을 수행할 수 있으며, 새 리소스가 추가될 때마다 한 번씩 다시 시작됩니다. 이로 인해 작업 성능이 떨어질 수 있습니다.

Amazon EMR 6.15.0 이상에서는 Flink에 첫 번째 리소스가 추가될 때 재시작 기간을 연 다음, 구성된 기본 1분 간격까지 기다리는 적응형 스케줄러의 결합된 재시작 메커니즘이 있습니다. 이 메커니즘에서는 구성된 병렬 처리로 작업을 실행하기 위한 가용 리소스가 충분하거나 간격 제한 시간이 초과될 경우 단일 재시작을 수행합니다.

샘플 작업을 이용한 벤치마크 테스트에서는 적응형 스케줄러와 Flink Autoscaler를 사용할 경우 이 기능이 기본 동작보다 10% 더 많은 레코드를 처리하는 것이 입증되었습니다.

결합된 재시작 메커니즘을 활성화하려면 `flink-conf.yaml` 파일에 다음 구성을 설정하세요.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Amazon EMR on EKS에서 Flink를 사용한 스팟 인스턴스의 정상적인 서비스 해제
<a name="jobruns-flink-decommission"></a>

Amazon EMR on EKS가 포함된 Flink를 사용하면 작업 복구 및 작업 규모 조정 과정에서 작업 재시작 시간을 개선할 수 있습니다.

## 개요
<a name="jobruns-flink-decommission-overview"></a>

Amazon EMR on EKS 릴리스 6.15.0 이상에서는 Apache Flink를 사용하는 Amazon EMR on EKS의 스팟 인스턴스에 대한 태스크 관리자의 정상적인 서비스 해제를 지원합니다. 이 기능의 일부로 Flink가 포함된 Amazon EMR on EKS에서는 다음과 같은 기능을 제공합니다.
+ **적시 체크포인트** – Flink 스트리밍 작업은 스팟 인스턴스 중단에 대응하고, 실행 중인 작업에 대한 JIT(적시) 체크포인트를 수행하고, 이러한 스팟 인스턴스에 추가 태스크를 예약하는 것을 방지할 수 있습니다. JIT 체크포인트는 기본 및 적응형 스케줄러에서 지원됩니다.
+ **결합된 재시작 메커니즘** – 결합된 재시작 메커니즘은 작업이 대상 리소스 병렬 처리에 도달하거나 현재 구성된 기간의 종료 시점에 도달한 후 작업을 재시작하기 위해 최대한 노력합니다. 또한 복수의 스팟 인스턴스 종료로 인해 발생할 수 있는 연속된 작업 재시작을 방지할 수도 있습니다. 결합된 재시작 메커니즘은 적응형 스케줄러에서만 이용 가능합니다.

이러한 기능에는 다음과 같은 이점이 있습니다.
+ 스팟 인스턴스를 활용하여 태스크 관리자를 실행하고 클러스터 지출을 줄일 수 있습니다.
+ 스팟 인스턴스 태스크 관리자의 활성 상태가 개선되면 복원력이 높아지고 작업 예약의 효율성도 증가합니다.
+ 스팟 인스턴스 종료 후 재시작 횟수가 감소하기 때문에 Flink 작업의 가동 시간이 증가합니다.

## 정상적인 서비스 해제 작동 방식
<a name="jobruns-flink-decommission-howitworks"></a>

Apache Flink가 실행되는 Amazon EMR on EKS 클러스터에서 Amazon EMR을 프로비저닝하고 작업 관리자에는 온디맨드 노드를, 태스크 관리자에는 스팟 인스턴스 노드를 각각 지정하는 사례를 생각해 보세요. 태스크 관리자는 종료 2분 전에 중단 알림을 받습니다.

이 시나리오에서 작업 관리자는 스팟 인스턴스 중단 신호를 처리하고, 스팟 인스턴스에서 추가 작업 예약을 차단하고, 스트리밍 작업에 대한 JIT 체크포인트를 시작합니다.

그러면 작업 관리자는 현재 재시작 간격 기간에서 현재 작업 병렬 처리를 이행할 수 있을 만큼 가용 신규 리소스가 충분히 존재하는 경우에만 작업 그래프를 다시 시작합니다. 재시작 기간 간격은 스팟 인스턴스 교체 기간, 새 태스크 관리자 포드 생성 및 작업 관리자 등록을 근거로 정해집니다.

## 사전 조건
<a name="jobruns-flink-decommission-prereqs"></a>

정상적인 서비스 해제를 사용하려면 Apache Flink가 실행되는 Amazon EMR on EKS 클러스터에서 스트리밍 작업을 생성하고 실행합니다. 다음 예시와 같이 하나 이상의 스팟 인스턴스에 대해 Adaptive Scheduler 및 태스크 관리자가 예약되도록 활성화하세요. 작업 관리자에는 온디맨드 노드를 사용해야 하며, 스팟 인스턴스도 하나 이상 존재하는 경우에는 태스크 관리자에도 온디맨드 노드를 사용할 수 있습니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## 구성
<a name="jobruns-flink-decommission-config"></a>

이 섹션에서는 서비스 해제 요구 사항을 위해 지정할 수 있는 구성들을 대부분 설명합니다.


| Key(키) | 설명 | 기본값  | 허용되는 값 | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  태스크 관리자의 정상적인 서비스 해제를 활성화합니다.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  적응형 스케줄러에서 결합된 재시작 메커니즘을 활성화합니다.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  작업에 대해 병합된 재시작을 수행하는 결합된 재시작 기간 간격입니다. 단위가 없는 정수는 밀리초로 해석합니다.  |  1m  |  예: 30, 60s, 3m, 1h  | 