

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 針對 Flink 應用程式使用 Autoscaler
<a name="jobruns-flink-autoscaler"></a>

運算子自動擴展器可透過從 Flink 作業中收集指標並在作業頂點層級自動調整平行程度來協助減輕背壓。以下為組態具體形式的範例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

此組態會使用 Amazon EMR 最新版本的預設值。如果您使用其他版本，則可能會有不同的值。

**注意**  
從 Amazon EMR 7.2.0 開始，您不需要在組態`kubernetes.operator`中包含 字首。如果您使用 7.1.0 或更低版本，您必須在每個組態之前使用 字首。例如，您必須指定 `kubernetes.operator.job.autoscaler.scaling.enabled`。

以下是自動擴展器的組態選項。
+ `job.autoscaler.scaling.enabled` – 指定是否啟用自動擴展器的頂點擴展執行。預設值為 `true`。如果您停用此組態，自動擴展器只會收集指標並評估每個頂點的建議平行處理，但不會升級任務。
+ `job.autoscaler.stabilization.interval` - 不會執行新擴展的穩定期。預設為 5 分鐘。
+ `job.autoscaler.metrics.window` - 擴展指標彙總視窗大小。視窗越大，越平滑和穩定，但自動擴展器可能會更慢，以應對突然的負載變化。預設為 15 分鐘。建議您使用 3 到 60 分鐘之間的值進行實驗。
+ `job.autoscaler.target.utilization` - 目標頂點利用率，以提供穩定的作業效能和一些負載波動緩衝。預設值為 `0.7`，目標是作業頂點 70% 的使用率/負載。
+ `job.autoscaler.target.utilization.boundary` - 作為額外緩衝的目標頂點利用率邊界，以避免負載波動的立即擴展。預設值為 `0.3`，這表示在觸發擴展動作之前，允許偏離目標使用率 30%。
+ `ob.autoscaler.restart.time` - 重新啟動應用程式的預期時間。預設為 5 分鐘。
+ `job.autoscaler.catch-up.duration` - 追趕的預期時間，這意味著在擴展操作完成後完全處理任何待處理項。預設為 5 分鐘。透過降低追趕時間，自動擴展器必須為擴展動作保留更多額外容量。
+ `pipeline.max-parallelism` - 自動擴展器可以使用的最大並行度。如果自動擴展器高於 Flink 組態中或直接在每個運算子上設定的最大並行度，則會忽略此限制。預設值為 -1。請注意，自動擴展器將並行度計算為最大並行數的除數，因此建議選擇具有大量除數的最大並行度設定，而不是依賴 Flink 提供的預設值。建議此組態使用 60 的倍數，例如 120、180、240、360、720 等。

如需更詳細的組態參考頁面，請參閱[自動擴展器組態](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)。

# Autoscaler 參數自動調校
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

本節說明各種 Amazon EMR 版本的自動調校行為。它也會詳細說明不同的自動調整規模組態。

**注意**  
Amazon EMR 7.2.0 及更高版本使用開放原始碼組態`job.autoscaler.restart.time-tracking.enabled`來啟用**重新擴展時間估算**。重新調整時間估算的功能與 Amazon EMR 自動調校功能相同，因此您不需要手動將經驗值指派給重新啟動時間。  
如果您使用的是 Amazon EMR 7.1.0 或更低版本，您仍然可以使用 Amazon EMR 自動調校。

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 及更高版本會測量套用自動調整規模決策所需的實際重新啟動時間。在 7.1.0 及更低版本中，您必須使用 組態`job.autoscaler.restart.time`來手動設定預估的最長重新啟動時間。透過使用組態 `job.autoscaler.restart.time-tracking.enabled`，您只需輸入第一個擴展的重新啟動時間。之後，運算子會記錄實際的重新啟動時間，並將它用於後續的擴展。

若要啟用此追蹤，請使用下列命令：

```
job.autoscaler.restart.time-tracking.enabled: true
```

以下是重新調整規模時間估算的相關組態。


| Configuration | 必要 | 預設 | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 否 | False | 指出 Flink Autoscaler 是否應隨時間自動調整組態，以最佳化擴展決策。請注意，Autoscaler 只能自動調整 Autoscaler 參數 restart.time。 | 
| job.autoscaler.restart.time | 否 | 5m | Amazon EMR on EKS 使用的預期重新啟動時間，直到運算子可以判斷先前擴展的實際重新啟動時間為止。 | 
| job.autoscaler.restart.time-tracking.limit | 否 | 15m | job.autoscaler.restart.time-tracking.enabled 將 設定為 時觀察到的重新啟動時間上限true。 | 

以下是您可以用來嘗試重新調整規模時間估算的範例部署規格：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

若要模擬背壓，請使用下列部署規格。

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

將下列 Python 指令碼上傳至 S3 儲存貯體。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

若要驗證重新調整規模時間估算是否正常運作，請確定 Flink 運算子的`DEBUG`層級記錄已啟用。以下範例示範如何更新 helm Chart 檔案 `values.yaml`。然後重新安裝更新的 Helm Chart，然後再次執行 Flink 任務。

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

取得領導 Pod 的名稱。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

執行下列命令以取得指標評估中使用的實際重新啟動時間。

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

您應該會看到類似以下的日誌。請注意，只有第一個擴展使用 ` job.autoscaler.restart.time`。後續擴展會使用觀察到的重新啟動時間。

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

開放原始碼內建的 Flink Autoscaler 使用許多指標來做出最佳的擴展決策。不過，它用於計算的預設值適用於大多數工作負載，可能不適用於指定的任務。新增至 Amazon EMR on EKS 版本的 Flink Operator 的自動調校功能會查看在特定擷取指標上觀察到的歷史趨勢，然後嘗試計算針對特定任務量身打造的最佳值。


| Configuration | 必要 | 預設 | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 否 | False | 指出 Flink Autoscaler 是否應該隨著時間自動調整組態，以最佳化自動擴展器擴展決策。目前，Autoscaler 只能自動調整 Autoscaler 參數 restart.time。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 否 | 3 | 指出 Autoscaler 在 Amazon EMR on EKS 指標組態映射中保留多少歷史 Amazon EMR on EKS 指標。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 否 | 3 | 指出 Autoscaler 開始計算指定任務的平均重新啟動時間之前執行的重新啟動次數。 | 

若要啟用自動調校，您必須完成下列操作：
+ 將 `kubernetes.operator.job.autoscaler.autotune.enable:` 設定為 `true`
+ 將 `metrics.job.status.enable:` 設定為 `TOTAL_TIME`
+ 遵循為 [Flink 應用程式使用 Autoscaler ](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html)的設定，以啟用 Autoscaling。

以下是您可以用來嘗試自動調校的範例部署規格。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

若要模擬背壓，請使用下列部署規格。

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

將下列 Python 指令碼上傳至 S3 儲存貯體。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

若要確認您的自動調校器是否正常運作，請使用下列命令。請注意，您必須使用自己的 Flink Operator 領導 Pod 資訊。

首先取得領導 Pod 的名稱。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

取得領導 Pod 的名稱後，您可以執行下列命令。

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

您應該會看到類似以下的日誌。

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------