

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Flink 应用程序中使用 Autoscaler
<a name="jobruns-flink-autoscaler"></a>

Operator Autoscaler 可以从 Flink 任务中收集指标，并自动调整任务顶点级别的并行度，从而帮助缓解反向压力。配置可能类似于如下示例：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

此配置使用最新版本的 Amazon EMR 的默认值。如果使用其他版本，值可能会有所不同。

**注意**  
从 Amazon EMR 7.2.0 开始，无需在配置中包含前缀 `kubernetes.operator`。如果使用 7.1.0 或更低版本，则必须在每次配置前使用前缀。例如，必须指定 `kubernetes.operator.job.autoscaler.scaling.enabled`。

以下是 Autoscaler 的配置选项。
+ `job.autoscaler.scaling.enabled`：指定是否允许 Autoscaler 执行顶点缩放。默认值为 `true`。如果禁用此配置，Autoscaler 只会收集指标并评估每个顶点的建议并行度，但不会升级作业。
+ `job.autoscaler.stabilization.interval`：不会执行新扩展的稳定期。默认值为 5 分钟。
+ `job.autoscaler.metrics.window`：扩展指标聚合窗口大小。窗口越大，就越流畅、越稳定，但 Autoscaler 对负载突变做出反应的速度可能会变慢。默认值为 15 分钟。建议使用 3 到 60 分钟之间的值进行实验。
+ `job.autoscaler.target.utilization`：提供稳定任务性能和一定负载波动缓冲能力的目标顶点利用率。默认情况下，作业顶点 utilization/load 的`0.7`目标为 70%。
+ `job.autoscaler.target.utilization.boundary`：目标顶点利用率边界，用作额外的缓冲，避免在负载波动时立即扩展。默认值为 `0.3`，即在触发缩放操作之前，允许与目标利用率有 30% 的偏差。
+ `ob.autoscaler.restart.time`：重新启动应用程序的预计时间。默认值为 5 分钟。
+ `job.autoscaler.catch-up.duration`：赶上进度的预计时间，即在扩展操作完成后完全处理积压工作的时间。默认值为 5 分钟。通过缩短赶上进度的持续时间，Autoscaler 必须为扩展操作预留更多额外容量。
+ `pipeline.max-parallelism`：Autoscaler 可以使用的最大并行度。如果该限值高于 Flink 配置中设定的最大并行度或直接在每个 Operator 上设定的最大并行度，则 Autoscaler 会忽略此限值。默认值为 -1。请注意，Autoscaler 将并行度计算为最大并行度数的除数，因此建议选择具有大量除数的最大并行度设置，而非依赖 Flink 提供的默认设置。建议对此配置使用 60 的倍数，例如 120、180、240、360、720。

有关详细配置的参考页面，请参阅 [Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)。

# Autoscaler 参数自动调整
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

本节介绍了各个 Amazon EMR 版本的自动调整行为。还详细介绍了不同的自动扩缩配置。

**注意**  
Amazon EMR 7.2.0 及更高版本使用开源配置 `job.autoscaler.restart.time-tracking.enabled` 来实现**重新缩放时间估计**。重新缩放时间估计与 Amazon EMR 自动调整的功能相同，因此无需为重启时间手动分配经验值。  
如果运行的是 Amazon EMR 7.1.0 或更低版本，您仍然可以使用 Amazon EMR 自动调整。

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 及更高版本测量会应用自动扩缩决策所需的实际重启时间。在 7.1.0 及更低版本中，必须使用配置 `job.autoscaler.restart.time` 来手动配置估计的最长重启时间。通过使用配置 `job.autoscaler.restart.time-tracking.enabled`，您只需输入第一次缩放的重启时间。之后，Operator 会记录实际的重启时间，并将其用于后续缩放。

要启用此跟踪，请使用以下命令：

```
job.autoscaler.restart.time-tracking.enabled: true
```

以下是重新缩放时间估计的相关配置。


| 配置 | 必需 | 默认值 | 说明 | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 否 | False | 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置，以优化缩放决策。请注意，Autoscaler 只能自动调整 Autoscaler 参数 restart.time。 | 
| job.autoscaler.restart.time | 否 | 5m | Amazon EMR on EKS 使用的预期重启时间，直到 Operator 可根据之前的缩放确定实际重启时间。 | 
| job.autoscaler.restart.time-tracking.limit | 否 | 15m | 当 job.autoscaler.restart.time-tracking.enabled 设置为 true 时观察到的最长重启时间。 | 

下面是一个示例部署规范，可用来尝试重新缩放时间估计：

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

要模拟背压，请使用以下部署规范。

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

将以下 Python 脚本上传到 S3 存储桶。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

要验证重缩时间估计是否有效，请确保已启用 Flink Operator 的 `DEBUG` 级别日志记录。下面的示例演示了如何更新 Helm 图表文件 `values.yaml`。然后重新安装更新后的 Helm 图表并再次运行 Flink 作业。

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

获取主容器组（pod）的名称。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

运行以下命令，获取指标评估中使用的实际重启时间。

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

您应该会看到类似下面的日志。请注意，只有第一次缩放才会使用 ` job.autoscaler.restart.time`。后续缩放使用观察到的重启时间。

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

开源内置 Flink Autoscaler 使用大量指标来做出最佳缩放决策。但在计算时使用的默认值适用于大多数工作负载，对于给定作业来说可能不是最佳值。Amazon EMR on EKS 版本的 Flink Operator 中添加的自动调整功能会查看针对特定捕获指标观察到的历史趋势，然后相应地尝试计算为给定作业定制的最佳值。


| 配置 | 必需 | 默认值 | 说明 | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 否 | False | 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置，以优化 Autoscaler 缩放决策。目前，Autoscaler 只能自动调整 Autoscaler 参数 restart.time。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 否 | 3 | 指示 Autoscaler 在 Amazon EMR on EKS 指标配置映射中保留的 Amazon EMR on EKS 历史指标数。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 否 | 3 | 指示 Autoscaler 在开始计算给定作业的平均重启时间之前执行的重启次数。 | 

要启用自动调整，必须完成以下操作：
+ 将 `kubernetes.operator.job.autoscaler.autotune.enable:` 设置为 `true`
+ 将 `metrics.job.status.enable:` 设置为 `TOTAL_TIME`
+ 按照[在 Flink 应用程序中使用 Autoscaler](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) 的设置启用自动扩缩功能。

下面是一个示例部署规范，可用来尝试自动调整。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

要模拟背压，请使用以下部署规范。

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

将以下 Python 脚本上传到 S3 存储桶。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

要验证 Autosutiner 是否正在运行，请使用以下命令。请注意，必须对 Flink Operator 使用您自己的主容器组（pod）信息。

首先，获取主容器组（pod）的名称。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

获取主容器组（pod）的名称后，您可以运行以下命令。

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

您应该会看到类似下面的日志。

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------