

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Uso di Autoscaler per le applicazioni Flink
<a name="jobruns-flink-autoscaler"></a>

L'autoscaler dell'operatore può contribuire ad alleviare la congestione raccogliendo i parametri dai processi Flink e regolando in automatico il parallelismo a livello di vertice di processo. Di seguito è riportato un esempio di come potrebbe presentarsi la tua configurazione:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Questa configurazione utilizza i valori predefiniti per l'ultima versione di Amazon EMR. Se utilizzi altre versioni, potresti avere valori diversi.

**Nota**  
A partire da Amazon EMR 7.2.0, non è necessario includere il prefisso nella configurazione`kubernetes.operator`. Se utilizzi la versione 7.1.0 o una versione precedente, devi utilizzare il prefisso prima di ogni configurazione. Ad esempio, è necessario specificare. `kubernetes.operator.job.autoscaler.scaling.enabled`

Di seguito sono elencate le opzioni di configurazione dell'autoscaler.
+ `job.autoscaler.scaling.enabled`— specifica se abilitare l'esecuzione della scala dei vertici da parte dell'autoscaler. Il valore predefinito è `true`. Se disabilitate questa configurazione, l'autoscaler raccoglie solo le metriche e valuta il parallelismo suggerito per ogni vertice, ma non aggiorna i lavori.
+ `job.autoscaler.stabilization.interval`: il periodo di stabilizzazione in cui non verrà eseguita alcun nuovo dimensionamento. L'impostazione predefinita è 5 minuti.
+ `job.autoscaler.metrics.window`: la dimensione della finestra di aggregazione dei parametri di dimensionamento. Più grande è la finestra, più è fluida e stabile, ma l'autoscaler potrebbe essere più lento a reagire a variazioni improvvise del carico. L'impostazione predefinita è 15 minuti. Consigliamo di sperimentare utilizzando un valore compreso tra 3 e 60 minuti.
+ `job.autoscaler.target.utilization`: l'utilizzo del vertice obiettivo per fornire prestazioni lavorative stabili e un certo buffer per le fluttuazioni del carico. L'impostazione predefinita è `0.7` il 70% utilization/load per i vertici del lavoro.
+ `job.autoscaler.target.utilization.boundary`: il limite di utilizzo del vertice obiettivo che funge da buffer aggiuntivo per evitare il dimensionamento immediato in caso di fluttuazioni del carico. L'impostazione predefinita è`0.3`, il che significa che è consentita una deviazione del 30% dall'utilizzo target prima di attivare un'azione di ridimensionamento.
+ `ob.autoscaler.restart.time`: il tempo previsto per il riavvio dell'applicazione. L'impostazione predefinita è 5 minuti.
+ `job.autoscaler.catch-up.duration`: il tempo previsto per il recupero, ovvero l'elaborazione completa di qualsiasi backlog dopo il completamento di un'operazione di dimensionamento. L'impostazione predefinita è 5 minuti. Riducendo la durata del tempo di recupero, l'autoscaler deve riservare maggiore capacità aggiuntiva per le azioni di dimensionamento.
+ `pipeline.max-parallelism`: il parallelismo massimo che l'autoscaler può utilizzare. L'autoscaler ignora questo limite se è superiore al parallelismo massimo configurato nella configurazione di Flink o direttamente su ciascun operatore. L'impostazione predefinita è -1. Tieni presente che l'autoscaler calcola il parallelismo come divisore del numero di parallelismo massimo, pertanto si consiglia di scegliere impostazioni di parallelismo massimo che prevedano molti divisori, invece di fare affidamento sui valori predefiniti forniti da Flink. Consigliamo di utilizzare multipli di 60 per questa configurazione, ad esempio 120, 180, 240, 360, 720, eccetera.

Per una pagina di riferimento più dettagliata sulla configurazione, consulta [Configurazione di autoscaler](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Autotuning dei parametri Autoscaler
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

Questa sezione descrive il comportamento di ottimizzazione automatica per varie versioni di Amazon EMR. Inoltre, approfondisce le diverse configurazioni di auto-scaling.

**Nota**  
Amazon EMR 7.2.0 e versioni successive utilizzano la configurazione open source `job.autoscaler.restart.time-tracking.enabled` per consentire la stima del tempo di **ridimensionamento**. La stima del tempo di riscala ha le stesse funzionalità dell'autotuning di Amazon EMR, quindi non è necessario assegnare manualmente valori empirici al tempo di riavvio.  
Puoi comunque utilizzare l'autotuning di Amazon EMR se utilizzi Amazon EMR 7.1.0 o versioni precedenti.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 e versioni successive misurano il tempo di riavvio effettivo richiesto per applicare le decisioni di scalabilità automatica. Nelle versioni 7.1.0 e precedenti, era necessario utilizzare la configurazione per configurare manualmente il tempo massimo di `job.autoscaler.restart.time` riavvio stimato. Utilizzando la configurazione`job.autoscaler.restart.time-tracking.enabled`, è sufficiente inserire un orario di riavvio per il primo ridimensionamento. Successivamente, l'operatore registra l'orario di riavvio effettivo e lo utilizzerà per i ridimensionamenti successivi.

Per abilitare questo tracciamento, utilizzate il seguente comando:

```
job.autoscaler.restart.time-tracking.enabled: true
```

Di seguito sono riportate le configurazioni correlate per la stima del tempo di ridimensionamento.


| Configurazione | Obbligatorio | Predefinita | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | No | False | Indica se Flink Autoscaler deve ottimizzare automaticamente le configurazioni nel tempo per ottimizzare le decisioni di ridimensionamento. Nota che Autoscaler può solo regolare automaticamente il parametro Autoscaler. restart.time | 
| job.autoscaler.restart.time | No | 5 min | Il tempo di riavvio previsto utilizzato da Amazon EMR su EKS fino a quando l'operatore non sarà in grado di determinare il tempo di riavvio effettivo in base alle scalature precedenti. | 
| job.autoscaler.restart.time-tracking.limit | No | 15 min | Il tempo di riavvio massimo osservato quando è impostato su. job.autoscaler.restart.time-tracking.enabled true | 

Di seguito è riportato un esempio di specifica di distribuzione che è possibile utilizzare per provare la stima del tempo di ridimensionamento:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Per simulare la contropressione, utilizza le seguenti specifiche di distribuzione.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Carica il seguente script Python nel tuo bucket S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Per verificare che la stima del tempo di ridimensionamento funzioni, assicurati che la registrazione dei `DEBUG` livelli dell'operatore Flink sia abilitata. L'esempio seguente mostra come aggiornare il file del grafico Helm. `values.yaml` Quindi reinstalla la tabella di guida aggiornata ed esegui nuovamente il job Flink.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Ottieni il nome del tuo leader pod.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Esegui il comando seguente per ottenere il tempo di riavvio effettivo utilizzato nelle valutazioni delle metriche.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Dovrebbero essere visualizzati log simili ai seguenti. Nota che viene utilizzato solo il primo ridimensionamento. ` job.autoscaler.restart.time` I ridimensionamenti successivi utilizzano il tempo di riavvio osservato.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

Il Flink Autoscaler open source integrato utilizza numerose metriche per prendere le migliori decisioni di scalabilità. Tuttavia, i valori predefiniti che utilizza per i suoi calcoli sono pensati per essere applicabili alla maggior parte dei carichi di lavoro e potrebbero non essere ottimali per un determinato lavoro. La funzionalità di autotuning aggiunta alla versione Amazon EMR on EKS di Flink Operator esamina le tendenze storiche osservate su specifiche metriche acquisite e quindi cerca di calcolare il valore più ottimale su misura per il determinato lavoro.


| Configurazione | Obbligatorio | Predefinita | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | No | False | Indica se Flink Autoscaler deve ottimizzare automaticamente le configurazioni nel tempo per ottimizzare le decisioni di ridimensionamento degli autoscaler. Attualmente, Autoscaler può solo regolare automaticamente il parametro Autoscaler. restart.time | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | No | 3 | Indica il numero di parametri storici di Amazon EMR su EKS che Autoscaler conserva nella mappa di configurazione dei parametri di Amazon EMR on EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | No | 3 | Indica il numero di riavvii che Autoscaler esegue prima di iniziare a calcolare il tempo di riavvio medio per un determinato lavoro. | 

Per abilitare l'autotuning, è necessario aver completato quanto segue:
+ Imposta `kubernetes.operator.job.autoscaler.autotune.enable:` su `true`
+ Imposta `metrics.job.status.enable:` su `TOTAL_TIME`
+ È seguita la configurazione di [Using Autoscaler for Flink applications per abilitare la scalabilità](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) automatica.

Di seguito è riportato un esempio di specifica di distribuzione che è possibile utilizzare per provare l'autotuning.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Per simulare la contropressione, utilizzate le seguenti specifiche di distribuzione.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Carica il seguente script Python nel tuo bucket S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Per verificare che l'autotuner funzioni, usa i seguenti comandi. Nota che devi usare le informazioni del tuo leader pod per l'operatore Flink.

Per prima cosa procuratevi il nome del vostro leader pod.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Una volta che hai il nome del tuo leader pod, puoi eseguire il seguente comando.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Dovreste vedere dei log simili ai seguenti.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------