

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Flink 애플리케이션에 대한 Autoscaler 사용
<a name="jobruns-flink-autoscaler"></a>

운영자의 Autoscaler를 사용하면 Flink 작업에서 지표를 수집하고 작업 버텍스 수준에서 병렬 처리를 자동으로 조정하여 역압을 완화할 수 있습니다. 다음은 구성에 대한 예제입니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

이 구성은 Amazon EMR의 최신 릴리스에 대한 기본값을 사용합니다. 다른 버전을 사용하는 경우 값이 다를 수 있습니다.

**참고**  
Amazon EMR 7.2.0부터는 구성에 `kubernetes.operator` 접두사를 포함하지 않아도 됩니다. 7.1.0 이하를 사용하는 경우 각 구성 전에 접두사를 사용해야 합니다. 예를 들어, `kubernetes.operator.job.autoscaler.scaling.enabled`를 지정해야 합니다.

다음은 Autoscaler의 구성 옵션입니다.
+ `job.autoscaler.scaling.enabled` – 오토스케일러에서 버텍스 조정 실행을 활성화할지 여부를 지정합니다. 기본값은 `true`입니다. 이 구성을 비활성화하면 오토스케일러는 지표만 수집하고 각 버텍스에 대해 제안된 병렬 처리를 평가하지만 작업을 업그레이드하지는 않습니다.
+ `job.autoscaler.stabilization.interval` - 새로운 조정이 실행되지 않는 안정화 기간. 기본값은 5분입니다.
+ `job.autoscaler.metrics.window` - 조정 지표 집계 기간. 기간이 길수록 더 원활하고 안정적이지만 갑작스러운 로드 변경에 대응하는 경우 Autoscaler 속도가 느려질 수 있습니다. 기본값은 15분입니다. 3\$160분의 값을 사용하여 실험해 보는 것이 좋습니다.
+ `job.autoscaler.target.utilization` - 안정적인 작업 성능을 제공하고 로드 변동을 위한 약간의 버퍼를 제공하기 위한 목표 버텍스 사용률. 기본값은 `0.7`로, 작업 버텍스의 사용률/로드를 70%로 설정합니다.
+ `job.autoscaler.target.utilization.boundary` - 로드 변동에서 즉각적인 조정을 피하기 위해 추가 버퍼 역할을 하는 목표 버텍스 사용률 경계. 기본값은 `0.3`이며, 조정 작업을 트리거하기 전에 목표 사용률과 30%의 편차가 허용됨을 의미합니다.
+ `ob.autoscaler.restart.time` - 애플리케이션을 다시 시작하는 데 걸리는 예상 시간. 기본값은 5분입니다.
+ `job.autoscaler.catch-up.duration` - 예상 캐치업 시간으로, 조정 작업이 완료된 후 모든 백로그를 완전히 처리합니다. 기본값은 5분입니다. 캐치업 기간을 줄임으로써 Autoscaler는 조정 작업을 위한 추가 용량을 예약해야 합니다.
+ `pipeline.max-parallelism` - Autoscaler에서 사용할 수 있는 최대 병렬 처리. 이 한도가 Flink 구성 또는 각 운영자에 구성된 최대 병렬 처리보다 더 높은 경우 Autoscaler는 이 한도를 무시합니다. 기본값은 -1입니다. Autoscaler는 최대 병렬 처리의 나눗수로 병렬 처리를 계산하므로, Flink에서 제공하는 기본값을 사용하는 대신, 나눗수가 많은 최대 병렬 처리를 선택하는 것이 좋습니다. 이 구성에서는 120, 180, 240, 360, 720 등과 같이 60의 배수를 사용하는 것이 좋습니다.

구성 참조 페이지에 대한 자세한 내용은 [Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)을 참조하세요.

# 오토스케일러 파라미터 자동 조정
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

이 섹션에서는 다양한 Amazon EMR 버전에 대한 자동 조정 동작을 설명합니다. 또한 다양한 오토 스케일링 구성도 자세히 설명합니다.

**참고**  
Amazon EMR 7.2.0 이상은 오픈 소스 구성 `job.autoscaler.restart.time-tracking.enabled`를 사용하여 **재조정 시간 예측**을 활성화합니다. 재조정 시간 예측은 Amazon EMR 자동 조정과 동일한 기능을 제공하므로, 재시작 시간에 경험적 값을 수동으로 할당하지 않아도 됩니다.  
Amazon EMR 7.1.0 이하를 사용하는 경우에도 Amazon EMR 자동 조정을 계속 사용할 수 있습니다.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 이상은 오토 스케일링 결정을 적용하는 데 필요한 실제 재시작 시간을 측정합니다. 릴리스 7.1.0 이하에서는 `job.autoscaler.restart.time` 구성을 사용하여 예상 최대 재시작 시간을 수동으로 구성해야 했습니다. `job.autoscaler.restart.time-tracking.enabled` 구성을 사용하면 첫 번째 조정에 대한 재시작 시간만 입력하면 됩니다. 이후 작업자는 실제 재시작 시간을 기록하고 후속 조정에 사용합니다.

이 추적을 활성화하려면 다음 명령을 사용합니다.

```
job.autoscaler.restart.time-tracking.enabled: true
```

다음은 재조정 시간 예측을 위한 관련 구성입니다.


| 구성 | 필수 | 기본값 | 설명 | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 아니요 | False | Flink 오토스케일러가 시간 경과에 따라 구성을 자동으로 조정하여 조정 결정을 최적화해야 하는지 여부를 나타냅니다. 오토스케일러는 오토스케일러 파라미터 restart.time만 자동 조정할 수 있습니다. | 
| job.autoscaler.restart.time | 아니요 | 5분 | 연산자가 이전 조정에서 실제 재시작 시간을 확인할 수 있을 때까지 Amazon EMR on EKS에서 사용하는 예상 재시작 시간. | 
| job.autoscaler.restart.time-tracking.limit | 아니요 | 15분 | job.autoscaler.restart.time-tracking.enabled가 true로 설정된 경우 관찰된 최대 재시작 시간. | 

다음은 재조정 시간 예측을 시도하는 데 사용할 수 있는 배포 사양의 예입니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

역압을 시뮬레이션하려면 다음 배포 사양을 사용합니다.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

다음 Python 스크립트를 S3 버킷에 업로드합니다.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

재조정 시간 예측이 작동하는지 확인하려면 Flink 연산자의 `DEBUG` 수준 로깅이 활성화되어 있는지 확인합니다. 아래 예제에서는 헬름 차트 파일(`values.yaml`)을 업데이트하는 방법을 보여줍니다. 그런 다음, 업데이트된 헬름 차트를 다시 설치하고 Flink 작업을 다시 실행합니다.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

리더 포드의 이름을 가져옵니다.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

다음 명령을 실행하여 지표 평가에 사용된 실제 재시작 시간을 가져옵니다.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

다음과 비슷한 로그가 표시됩니다. 첫 번째 조정에서만 ` job.autoscaler.restart.time`을 사용합니다. 후속 조정에서는 관찰된 재시작 시간을 사용합니다.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

오픈 소스 기본 제공 Flink 오토스케일러는 다양한 지표를 사용하여 최적의 조정 결정을 내립니다. 그러나 계산에 사용하는 기본값은 대부분의 워크로드에 적용 가능한 값이지만, 주어진 작업에 적합하지 않을 수 있습니다. Flink 연산자의 Amazon EMR on EKS 버전에 추가된 자동 조정 기능은 캡처된 특정 지표에서 관찰된 과거 추세를 검토한 다음, 해당 작업에 맞게 조정된 가장 최적의 값을 계산하려고 시도합니다.


| 구성 | 필수 | 기본값 | 설명 | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 아니요 | False | Flink 오토스케일러가 시간 경과에 따라 구성을 자동으로 조정하여 오토스케일러 조정 결정을 최적화해야 하는지 여부를 나타냅니다. 현재 오토스케일러는 오토스케일러 파라미터 restart.time만 자동 조정할 수 있습니다. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 아니요 | 3 | 오토스케일러가 Amazon EMR on EKS 지표 구성 맵에 보관하는 Amazon EMR on EKS의 기록 지표 수를 나타냅니다. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 아니요 | 3 | 주어진 작업의 평균 재시작 시간 계산을 시작하기 전에 오토스케일러가 수행하는 재시작 횟수를 나타냅니다. | 

자동 조정을 활성화하려면 다음을 완료해야 합니다.
+ `kubernetes.operator.job.autoscaler.autotune.enable:`를 `true`로 설정합니다.
+ `metrics.job.status.enable:`를 `TOTAL_TIME`로 설정합니다.
+ 자동 조정을 활성화하기 위해 [Flink 애플리케이션에 대해 오토스케이러 사용](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) 설정을 따랐습니다.

다음은 자동 조정을 시도하는 데 사용할 수 있는 배포 사양에 대한 예제입니다.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

역압을 시뮬레이션하려면 다음 배포 사양을 사용합니다.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

다음 Python 스크립트를 S3 버킷에 업로드합니다.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

자동 튜너가 작동하는지 확인하려면 다음 명령을 사용합니다. Flink 연산자에 대한 자체 리더 포드 정보를 사용해야 합니다.

먼저 리더 포드의 이름을 가져옵니다.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

리더 포드의 이름을 지정한 후 다음 명령을 실행할 수 있습니다.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

다음과 비슷한 로그가 표시됩니다.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------