

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisation d'Autoscaler pour les applications Flink
<a name="jobruns-flink-autoscaler"></a>

L'outil de mise à l'échelle automatique de l'opérateur peut contribuer à réduire la surcharge en recueillant des métriques des tâches Flink et en modifiant automatiquement le parallélisme au niveau du sommet de la tâche. Voici un exemple de ce à quoi pourrait ressembler votre configuration :

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Cette configuration utilise les valeurs par défaut de la dernière version d'Amazon EMR. Si vous utilisez d'autres versions, vous pouvez avoir des valeurs différentes.

**Note**  
Depuis Amazon EMR 7.2.0, il n'est pas nécessaire d'inclure le préfixe `kubernetes.operator` dans votre configuration. Si vous utilisez la version 7.1.0 ou une version antérieure, vous devez utiliser le préfixe avant chaque configuration. Par exemple, vous devez spécifier`kubernetes.operator.job.autoscaler.scaling.enabled`.

Les options de configuration de l'outil de mise à l'échelle automatique sont les suivantes.
+ `job.autoscaler.scaling.enabled`— indique s'il faut activer l'exécution de la mise à l'échelle des sommets par l'autoscaler. La valeur par défaut est `true`. Si vous désactivez cette configuration, l'autoscaler collecte uniquement les métriques et évalue le parallélisme suggéré pour chaque sommet, mais ne met pas à niveau les tâches.
+ `job.autoscaler.stabilization.interval` : la période de stabilisation au cours de laquelle aucune nouvelle mise à l'échelle ne sera exécutée. La valeur par défaut est de 5 minutes.
+ `job.autoscaler.metrics.window` : la taille de la fenêtre d'agrégation des métriques de mise à l'échelle. Une fenêtre de taille plus grande rend le processus plus fluide et stable, toutefois, cela peut ralentir la réactivité de l'outil de mise à l'échelle automatique face à des variations brusques de charge. La valeur par défaut est de 15 minutes. Nous vous recommandons d'expérimenter en utilisant une valeur comprise entre 3 et 60 minutes.
+ `job.autoscaler.target.utilization` : l'utilisation du sommet cible pour assurer des performances stables de la tâche et une marge pour les fluctuations de charge. Le `0.7` ciblage par défaut est de 70 % utilization/load pour les sommets des tâches.
+ `job.autoscaler.target.utilization.boundary` : la limite d'utilisation du sommet cible qui sert de tampon supplémentaire pour éviter une mise à l'échelle immédiate en cas de fluctuations de charge. La valeur par défaut est`0.3`, ce qui signifie qu'un écart de 30 % par rapport à l'utilisation cible est autorisé avant de déclencher une action de dimensionnement.
+ `ob.autoscaler.restart.time` : l'heure prévue pour redémarrer l'application. La valeur par défaut est de 5 minutes.
+ `job.autoscaler.catch-up.duration` : le temps estimé pour rattraper le retard, c'est-à-dire pour traiter entièrement tout retard accumulé après l'achèvement d'une opération de mise à l'échelle. La valeur par défaut est de 5 minutes. En réduisant la durée de rattrapage, l'outil de mise à l'échelle automatique doit réserver une plus grande capacité supplémentaire pour les actions de mise à l'échelle.
+ `pipeline.max-parallelism` : le parallélisme maximal que l'outil de mise à l'échelle automatique peut utiliser. L'outil de mise à l'échelle automatique ignore cette limite si elle est supérieure au parallélisme maximal configuré dans la configuration Flink ou directement sur chaque opérateur. La valeur par défaut est -1. Notez que l'outil de mise à l'échelle automatique calcule le parallélisme comme un diviseur du parallélisme maximum. Il est donc recommandé de sélectionner des paramètres de parallélisme maximum qui offrent une large gamme de diviseurs potentiels, plutôt que de se baser uniquement sur les valeurs par défaut proposées par Flink. Nous recommandons d'utiliser des multiples de 60 pour cette configuration, tels que 120, 180, 240, 360, 720, etc.

Pour une page de référence plus détaillée sur la configuration, consultez la rubrique [Configuration de l'outil de mise à l'échelle automatique](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Réglage automatique des paramètres de l'Autoscaler
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

Cette section décrit le comportement de réglage automatique pour les différentes versions d'Amazon EMR. Il décrit également en détail les différentes configurations d'auto-scaling.

**Note**  
Amazon EMR 7.2.0 et versions ultérieures utilisent la configuration open source `job.autoscaler.restart.time-tracking.enabled` pour permettre l'estimation du temps de **redimensionnement**. L'estimation du temps de redimensionnement possède les mêmes fonctionnalités que le réglage automatique d'Amazon EMR. Vous n'avez donc pas à attribuer manuellement des valeurs empiriques à l'heure de redémarrage.  
Vous pouvez toujours utiliser le réglage automatique d'Amazon EMR si vous utilisez Amazon EMR 7.1.0 ou une version antérieure.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 et versions ultérieures mesurent le temps de redémarrage réel requis pour appliquer les décisions de dimensionnement automatique. Dans les versions 7.1.0 et antérieures, vous deviez utiliser la configuration `job.autoscaler.restart.time` pour configurer manuellement le temps de redémarrage maximal estimé. En utilisant la configuration`job.autoscaler.restart.time-tracking.enabled`, il vous suffit de saisir une heure de redémarrage pour la première mise à l'échelle. Ensuite, l'opérateur enregistre l'heure de redémarrage réelle et l'utilisera pour les redimensionnements ultérieurs.

Pour activer ce suivi, utilisez la commande suivante :

```
job.autoscaler.restart.time-tracking.enabled: true
```

Les configurations associées pour l'estimation du temps de redimensionnement sont les suivantes.


| Configuration | Obligatoire | Par défaut | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | Non | False | Indique si le Flink Autoscaler doit ajuster automatiquement les configurations au fil du temps afin d'optimiser les décisions de dimensionnement. Notez que l'Autoscaler peut uniquement régler automatiquement le paramètre Autoscaler. restart.time | 
| job.autoscaler.restart.time | Non | 5 min | Temps de redémarrage prévu utilisé par Amazon EMR on EKS jusqu'à ce que l'opérateur puisse déterminer le temps de redémarrage réel à partir des mises à l'échelle précédentes. | 
| job.autoscaler.restart.time-tracking.limit | Non | 15 min | Durée de redémarrage maximale observée lorsque le job.autoscaler.restart.time-tracking.enabled paramètre est réglé surtrue. | 

Voici un exemple de spécification de déploiement que vous pouvez utiliser pour essayer d'estimer le temps de redimensionnement :

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Pour simuler la contre-pression, utilisez les spécifications de déploiement suivantes.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Téléchargez le script Python suivant dans votre compartiment S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Pour vérifier que l'estimation du temps de redimensionnement fonctionne, assurez-vous que l'enregistrement des `DEBUG` niveaux par l'opérateur Flink est activé. L'exemple ci-dessous montre comment mettre à jour le fichier de diagramme de barre`values.yaml`. Réinstallez ensuite le tableau de bord mis à jour et réexécutez votre tâche Flink.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Obtenez le nom de votre module leader.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Exécutez la commande suivante pour obtenir le temps de redémarrage réel utilisé dans les évaluations des métriques.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Vous devriez voir des journaux similaires aux suivants. Notez que seule la première mise à l'échelle l'utilise` job.autoscaler.restart.time`. Les mises à l'échelle suivantes utilisent le temps de redémarrage observé.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

Le logiciel open source intégré Flink Autoscaler utilise de nombreux indicateurs pour prendre les meilleures décisions en matière de dimensionnement. Cependant, les valeurs par défaut qu'il utilise pour ses calculs sont censées s'appliquer à la plupart des charges de travail et peuvent ne pas être optimales pour une tâche donnée. La fonction de réglage automatique ajoutée à la version Amazon EMR on EKS du Flink Operator examine les tendances historiques observées sur des métriques capturées spécifiques, puis tente de calculer la valeur la plus optimale adaptée à la tâche donnée.


| Configuration | Obligatoire | Par défaut | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | Non | False | Indique si le Flink Autoscaler doit ajuster automatiquement les configurations au fil du temps afin d'optimiser les décisions des autoscalers. Actuellement, l'Autoscaler peut uniquement régler automatiquement le paramètre Autoscaler. restart.time | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | Non | 3 | Indique le nombre de métriques historiques Amazon EMR sur EKS que l'Autoscaler conserve dans la carte de configuration des métriques Amazon EMR on EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | Non | 3 | Indique le nombre de redémarrages effectués par Autoscaler avant de commencer à calculer le temps de redémarrage moyen pour une tâche donnée. | 

Pour activer le réglage automatique, vous devez avoir effectué les opérations suivantes :
+ Définissez `kubernetes.operator.job.autoscaler.autotune.enable:` sur `true`
+ Définissez `metrics.job.status.enable:` sur `TOTAL_TIME`
+ A suivi la configuration de l'[utilisation d'Autoscaler pour les applications Flink pour activer la](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) mise à l'échelle automatique.

Voici un exemple de spécification de déploiement que vous pouvez utiliser pour essayer le réglage automatique.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Pour simuler la contre-pression, utilisez les spécifications de déploiement suivantes.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Téléchargez le script Python suivant dans votre compartiment S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Pour vérifier que votre autotuner fonctionne, utilisez les commandes suivantes. Notez que vous devez utiliser les informations de votre propre module leader pour le Flink Operator.

Obtenez d'abord le nom de votre module leader.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Une fois que vous avez le nom de votre module leader, vous pouvez exécuter la commande suivante.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Vous devriez voir des journaux similaires aux suivants.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------