

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden von Autoscaler für Flink-Anwendungen
<a name="jobruns-flink-autoscaler"></a>

Der Operator-Autoscaler kann dazu beitragen, den Gegendruck zu verringern, indem er Messwerte von Flink-Aufträge sammelt und die Parallelität automatisch auf Auftrag-Scheitelpunktebene anpasst. Im Folgenden finden Sie ein Beispiel dafür, wie Ihre Konfiguration aussehen könnte:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Diese Konfiguration verwendet Standardwerte für die neueste Version von Amazon EMR. Wenn Sie andere Versionen verwenden, haben Sie möglicherweise andere Werte.

**Anmerkung**  
Ab Amazon EMR 7.2.0 müssen Sie das Präfix nicht mehr `kubernetes.operator` in Ihre Konfiguration aufnehmen. Wenn Sie 7.1.0 oder eine niedrigere Version verwenden, müssen Sie das Präfix vor jeder Konfiguration verwenden. Beispielsweise müssen Sie angeben`kubernetes.operator.job.autoscaler.scaling.enabled`.

Im Folgenden finden Sie die Konfigurationsoptionen für den Autoscaler.
+ `job.autoscaler.scaling.enabled`— gibt an, ob die Ausführung der Scheitelpunktskalierung durch das Autoscaling aktiviert werden soll. Der Standardwert ist `true`. Wenn Sie diese Konfiguration deaktivieren, sammelt der Autoscaler nur Metriken und bewertet die vorgeschlagene Parallelität für jeden Scheitelpunkt, aktualisiert die Jobs jedoch nicht.
+ `job.autoscaler.stabilization.interval` – der Stabilisierungszeitraum, in dem keine neue Skalierung durchgeführt wird. Standard ist 5 Minuten.
+ `job.autoscaler.metrics.window` – die Größe des Aggregationsfensters für Skalierungsmetriken. Je größer das Fenster, desto flüssiger und stabiler, aber der Autoscaler reagiert möglicherweise langsamer auf plötzliche Laständerungen. Die Standardeinstellung ist 15 Minuten. Wir empfehlen Ihnen, zu experimentieren, indem Sie einen Wert zwischen 3 und 60 Minuten verwenden.
+ `job.autoscaler.target.utilization` – die Zielauslastung der Scheitelpunkte, um eine stabile Arbeitsleistung und einen gewissen Puffer für Lastschwankungen zu gewährleisten. In der Standardeinstellung werden 70% utilization/load für die Scheitelpunkte des Jobs `0.7` angepeilt.
+ `job.autoscaler.target.utilization.boundary` – die Auslastungsgrenze für den Zielscheitelpunkt, die als zusätzlicher Puffer dient, um eine sofortige Skalierung bei Lastschwankungen zu vermeiden. Die Standardeinstellung ist`0.3`, was bedeutet, dass eine Abweichung von 30% von der Zielauslastung zulässig ist, bevor eine Skalierungsaktion ausgelöst wird.
+ `ob.autoscaler.restart.time` – die erwartete Zeit für den Neustart der Anwendung. Standard ist 5 Minuten.
+ `job.autoscaler.catch-up.duration` – die erwartete Aufholzeit, d. h. die vollständige Bearbeitung von etwaigen Rückständen nach Abschluss eines Skalierungsvorgangs. Standard ist 5 Minuten. Durch die Verkürzung der Nachholdauer muss der Autoscaler mehr zusätzliche Kapazität für die Skalierungsaktionen reservieren.
+ `pipeline.max-parallelism` – die maximale Parallelität, die der Autoscaler verwenden kann. Der Autoscaler ignoriert dieses Limit, wenn es höher ist als die maximale Parallelität, die in der Flink-Konfiguration oder direkt für jeden Operator konfiguriert wurde. Die Standardeinstellung ist -1. Beachten Sie, dass der Autoscaler die Parallelität als Divisor der maximalen Parallelitätszahl berechnet. Es wird daher empfohlen, Einstellungen für maximale Parallelität zu wählen, die viele Teiler haben, anstatt sich auf die von Flink bereitgestellten Standardwerte zu verlassen. Wir empfehlen, für diese Konfiguration ein Vielfaches von 60 zu verwenden, z. B. 120, 180, 240, 360, 720 usw.

Eine detailliertere Referenzseite zur Konfiguration finden Sie unter [Autoscaler-Konfiguration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Autotuning der Autoscaler-Parameter
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

In diesem Abschnitt wird das Autotuning-Verhalten für verschiedene Amazon EMR-Versionen beschrieben. Es geht auch detailliert auf verschiedene Konfigurationen für die auto-scaling ein.

**Anmerkung**  
Amazon EMR 7.2.0 und höher verwendet die Open-Source-Konfiguration, um die Zeitschätzung für die **Neuskalierung `job.autoscaler.restart.time-tracking.enabled`** zu ermöglichen. Die Zeitschätzung für die Neuskalierung hat dieselbe Funktionalität wie Amazon EMR Autotuning, sodass Sie der Neustartzeit keine empirischen Werte manuell zuweisen müssen.  
Sie können Amazon EMR Autotuning weiterhin verwenden, wenn Sie Amazon EMR 7.1.0 oder niedriger verwenden.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 und höher misst die tatsächlich erforderliche Neustartzeit, um Autoscaling-Entscheidungen anzuwenden. In den Versionen 7.1.0 und niedriger mussten Sie die Konfiguration verwenden, um die geschätzte maximale `job.autoscaler.restart.time` Neustartzeit manuell zu konfigurieren. Wenn Sie die Konfiguration verwenden`job.autoscaler.restart.time-tracking.enabled`, müssen Sie nur eine Neustartzeit für die erste Skalierung eingeben. Danach zeichnet der Bediener die tatsächliche Neustartzeit auf und verwendet sie für nachfolgende Skalierungen.

Verwenden Sie den folgenden Befehl, um dieses Tracking zu aktivieren:

```
job.autoscaler.restart.time-tracking.enabled: true
```

Im Folgenden sind die zugehörigen Konfigurationen für die Schätzung der Zeit bei der Neuskalierung aufgeführt.


| Konfiguration | Erforderlich | Standard | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | Nein | Falsch | Gibt an, ob der Flink Autoscaler die Konfigurationen im Laufe der Zeit automatisch anpassen soll, um die Skalierungsentscheidungen zu optimieren. Beachten Sie, dass der Autoscaler nur den Autoscaler-Parameter automatisch abstimmen kann. restart.time | 
| job.autoscaler.restart.time | Nein | 5m | Die erwartete Neustartzeit, die Amazon EMR auf EKS verwendet, bis der Betreiber die tatsächliche Neustartzeit anhand früherer Skalierungen ermitteln kann. | 
| job.autoscaler.restart.time-tracking.limit | Nein | 15m | Die maximale beobachtete Neustartzeit, wenn auf eingestellt ist. job.autoscaler.restart.time-tracking.enabled true | 

Im Folgenden finden Sie ein Beispiel für eine Bereitstellungsspezifikation, mit der Sie die Zeitschätzung für die Neuskalierung ausprobieren können:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Verwenden Sie die folgende Einsatzspezifikation, um Gegendruck zu simulieren.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Laden Sie das folgende Python-Skript in Ihren S3-Bucket hoch.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Um zu überprüfen, ob die Zeitschätzung für die Neuskalierung funktioniert, stellen Sie sicher, dass die `DEBUG` Level-Protokollierung des Flink-Operators aktiviert ist. Das folgende Beispiel zeigt, wie die Helm-Chart-Datei aktualisiert wird. `values.yaml` Installieren Sie dann das aktualisierte Helmdiagramm erneut und führen Sie Ihren Flink-Job erneut aus.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Holen Sie sich den Namen Ihres Leader-Pods.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Führen Sie den folgenden Befehl aus, um die tatsächliche Neustartzeit zu ermitteln, die bei der Auswertung der Kennzahlen verwendet wird.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Sie sollten Protokolle sehen, die den folgenden ähneln. Beachten Sie, dass nur bei der ersten Skalierung verwendet wird` job.autoscaler.restart.time`. Nachfolgende Skalierungen verwenden die beobachtete Neustartzeit.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

Der integrierte Open-Source-Flink Autoscaler verwendet zahlreiche Metriken, um die besten Skalierungsentscheidungen zu treffen. Die Standardwerte, die er für seine Berechnungen verwendet, sind jedoch so konzipiert, dass sie für die meisten Workloads gelten und für einen bestimmten Job möglicherweise nicht optimal sind. Die Autotuning-Funktion, die der Amazon EMR on EKS-Version des Flink Operators hinzugefügt wurde, berücksichtigt historische Trends, die bei bestimmten erfassten Metriken beobachtet wurden, und versucht dann entsprechend, den optimalsten Wert zu berechnen, der auf den jeweiligen Job zugeschnitten ist.


| Konfiguration | Erforderlich | Standard | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | Nein | Falsch | Gibt an, ob der Flink Autoscaler die Konfigurationen im Laufe der Zeit automatisch anpassen soll, um die Skalierungsentscheidungen des Autoscalers zu optimieren. Derzeit kann der Autoscaler nur den Autoscaler-Parameter automatisch abstimmen. restart.time | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | Nein | 3 | Gibt an, wie viele historische Amazon EMR on EKS-Metriken der Autoscaler in der Amazon EMR on EKS-Metrik-Konfigurationsübersicht speichert. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | Nein | 3 | Gibt an, wie viele Neustarts der Autoscaler durchführt, bevor er mit der Berechnung der durchschnittlichen Neustartzeit für einen bestimmten Job beginnt. | 

Um Autotuning zu aktivieren, müssen Sie die folgenden Schritte abgeschlossen haben:
+ Setzen Sie `kubernetes.operator.job.autoscaler.autotune.enable:` auf `true`
+ Setzen Sie `metrics.job.status.enable:` auf `TOTAL_TIME`
+ Folgte der Einrichtung von [Using Autoscaler for Flink applications](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html), um Autoscaling zu aktivieren.

Im Folgenden finden Sie ein Beispiel für eine Bereitstellungsspezifikation, mit der Sie Autotuning ausprobieren können.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Verwenden Sie die folgende Einsatzspezifikation, um Gegendruck zu simulieren.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Laden Sie das folgende Python-Skript in Ihren S3-Bucket hoch.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Verwenden Sie die folgenden Befehle, um zu überprüfen, ob Ihr Autotuner funktioniert. Beachten Sie, dass Sie Ihre eigenen Leader-Pod-Informationen für den Flink-Operator verwenden müssen.

Ermitteln Sie zunächst den Namen Ihres Leader-Pods.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Sobald Sie den Namen Ihres Leader-Pods haben, können Sie den folgenden Befehl ausführen.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Sie sollten Protokolle sehen, die den folgenden ähneln.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------