

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Flink アプリケーションでの Autoscaler の使用
<a name="jobruns-flink-autoscaler"></a>

演算子 autoscaler を使用すると、Flink ジョブからメトリクスを収集してジョブの頂点レベルで並列処理を自動的に調整することで、バックプレッシャーを緩和できます。次に、設定がどのようになるか、その一例を示します。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

この設定では、Amazon EMR の最新リリースのデフォルト値を使用します。他のバージョンを使用する場合、値が異なる場合があります。

**注記**  
Amazon EMR 7.2.0 以降、設定にプレフィックス `kubernetes.operator` を含める必要はありません。7.1.0 以前を使用する場合は、各設定の前にプレフィックスを使用する必要があります。例えば、`kubernetes.operator.job.autoscaler.scaling.enabled` と指定する必要があります。

次に、autoscaler の設定オプションを示します。
+ `job.autoscaler.scaling.enabled` – autoscaler による頂点スケーリングの実行を有効にするかどうかを指定します。デフォルトは `true` です。この設定を無効にすると、autoscaler はメトリクスのみを収集し、頂点ごとに推奨される並列処理を評価しますが、ジョブはアップグレードされません。
+ `job.autoscaler.stabilization.interval` — 新しいスケーリングが実行されない安定化期間。デフォルトは 5 分です。
+ `job.autoscaler.metrics.window` — スケーリングメトリクスの集計期間ウィンドウサイズ。ウィンドウサイズが大きいほど、スムーズで安定性が増しますが、負荷の突然の変化に対して autoscaler の対応が遅くなる可能性があります。デフォルトは 15 分です。3～60 分の値を試してみることをお勧めします。
+ `job.autoscaler.target.utilization` — 頂点の目標使用率であり、ジョブのパフォーマンスを安定させ、負荷の変動をある程度和らげることができます。デフォルトは `0.7` で、ジョブ頂点の目標使用率/負荷は 70% です。
+ `job.autoscaler.target.utilization.boundary` — 頂点の目標使用率の境界であり、負荷が変動してもすぐにはスケールしないよう、バッファとしての役割を果たします。デフォルトは `0.3` です。つまり、目標使用率からの偏差を 30% まで許容します。この値を超えると、スケーリングアクションをトリガーします。
+ `ob.autoscaler.restart.time` — アプリケーションを再起動するまでの想定時間。デフォルトは 5 分です。
+ `job.autoscaler.catch-up.duration` — キャッチアップするまでの想定時間。キャッチアップとは、スケーリング操作が完了した後でバックログを完全に処理することです。デフォルトは 5 分です。このキャッチアップ期間を短くすると、autoscaler がスケーリングアクションのために予約しなければならないキャパシティが増えます。
+ `pipeline.max-parallelism` — autoscaler が使用できる最大並列処理。この値が Flink 設定か各オペレータに直接設定されている最大並列処理よりも高い場合、autoscaler はこの制限を無視します。デフォルトは -1 です。autoscaler は、最大並列処理数の除数として並列処理を計算することに注意してください。このため、Flink が提供するデフォルトを利用するのではなく、除数が多い最大並列処理設定を選択することをお勧めします。この設定には、120、180、240、360、720 といった 60 の倍数を使用することをお勧めします。

詳細な設定リファレンスページについては、「[Autoscaler configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration)」を参照してください。

# autoscaler パラメータの自動調整
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

このセクションでは、さまざまな Amazon EMR バージョンの自動調整の動作について説明します。また、さまざまな自動スケーリング設定についても詳しく説明します。

**注記**  
Amazon EMR 7.2.0 以降では、オープンソース設定 `job.autoscaler.restart.time-tracking.enabled` を使用して、**再スケール時間推定**を有効にします。再スケール時間推定には Amazon EMR 自動調整と同じ機能があるため、再起動時間に経験値を手動で割り当てる必要はありません。  
Amazon EMR 7.1.0 以前を使用している場合でも、Amazon EMR 自動調整を使用できます。

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 以降では、自動スケーリングの決定を適用するために必要な実際の再起動時間を測定します。リリース 7.1.0 以前では、`job.autoscaler.restart.time` 設定を使用して推定最大再起動時間を手動で設定する必要がありました。設定 `job.autoscaler.restart.time-tracking.enabled` を使用すると、最初のスケーリングの再起動時間を入力するだけで済みます。その後、オペレータは実際の再起動時間を記録してその後のスケーリングに使用します。

この追跡を有効にするには、次のコマンドを使用します。

```
job.autoscaler.restart.time-tracking.enabled: true
```

以下は、再スケール時間推定の関連設定です。


| 設定 | [Required] (必須) | デフォルト | 説明  | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | 不可 | 誤 | Flink Autoscaler が時間の経過とともに設定を自動的に調整してスケーリングの決定を最適化するかどうかを示します。Autoscaler は Autoscaler パラメータ restart.time のみを自動チューニングできることに注意してください。 | 
| job.autoscaler.restart.time | 不可 | 5m | オペレータが以前のスケーリングから実際の再起動時間を決定するまで、Amazon EMR on EKS が使用する予想される再起動時間です。 | 
| job.autoscaler.restart.time-tracking.limit | 不可 | 15m | job.autoscaler.restart.time-tracking.enabled が true に設定されている場合の最大観察再起動時間。 | 

以下は、再スケール時間推定を試すために使用できるデプロイ仕様の例です。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

バックプレッシャーをシミュレートするには、次のデプロイ仕様を使用します。

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

次の Python スクリプトを S3 バケットにアップロードします。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

再スケール時間推定が機能していることを確認するには、Flink オペレータの `DEBUG` レベルのログ記録が有効になるようにします。以下の例は、Helm チャートファイル `values.yaml` を更新する方法を示しています。次に、更新された helm チャートを再インストールし、Flink ジョブを再度実行します。

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

リーダーポッドの名前を取得します。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

次のコマンドを実行して、メトリクス評価で使用される実際の再起動時間を取得します。

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

次のようなログが表示されます。最初のスケーリングのみが ` job.autoscaler.restart.time` を使用することに注意してください。その後のスケーリングでは、観察された再起動時間が使用されます。

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

オープンソースの組み込み Flink Autoscaler は、多くのメトリクスを使用して最適なスケーリング決定を行います。ただし、計算に使用するデフォルト値は、ほとんどのワークロードに適用できることを目的としており、特定のジョブには最適ではない場合があります。Flink オペレータの Amazon EMR on EKS バージョンに追加された自動調整機能は、特定のキャプチャされたメトリクスで観察された過去の傾向を調べ、それに応じて特定のジョブに合わせて調整された最適な値を計算しようとします。


| 設定 | [Required] (必須) | デフォルト | 説明  | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | 不可 | 誤 | Flink Autoscaler が時間の経過とともに設定を自動的に調整して autoscaler のスケーリングの決定を最適化するかどうかを示します。現在、Autoscaler は Autoscaler restart.time パラメータ のみを自動チューニングできます。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | 不可 | 3 | Autoscaler が Amazon EMR on EKS メトリクス設定マップに保持する Amazon EMR on EKS メトリクスの履歴数を示します。 | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | 不可 | 3 | Autoscaler が特定のジョブの平均再起動時間の計算を開始する前に実行する再起動の数を示します。 | 

自動調整を有効にするには、以下を完了している必要があります。
+ `kubernetes.operator.job.autoscaler.autotune.enable:` を `true` に設定します。
+ `metrics.job.status.enable:` を `TOTAL_TIME` に設定します。
+ [Autoscaler for Flink アプリケーションの使用](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html)のセットアップに従って Autoscaling を有効にしている。

以下は、自動調整を試すために使用できるデプロイ仕様の例です。

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

バックプレッシャーをシミュレートするには、次のデプロイ仕様を使用します。

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

次の Python スクリプトを S3 バケットにアップロードします。

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

自動チューナーが動作していることを確認するには、次のコマンドを使用します。Flink オペレータには独自のリーダーポッド情報を使用する必要があります。

まず、リーダーポッドの名前を取得します。

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

リーダーポッドの名前を取得したら、次のコマンドを実行できます。

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

次のようなログが表示されます。

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------