

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Uso del escalador automático para aplicaciones de Flink
<a name="jobruns-flink-autoscaler"></a>

El escalador automático del operador puede ayudar a reducir la contrapresión mediante la recopilación de métricas de los trabajos de Flink y el ajuste automático del paralelismo a nivel de vértice de los trabajos. A continuación se muestra un ejemplo de cómo podría ser su configuración:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Esta configuración usa valores predeterminados para la última versión de Amazon EMR. Si utiliza otras versiones, es posible que tengan valores diferentes.

**nota**  
A partir de Amazon EMR 7.2.0, no necesita incluir el prefijo `kubernetes.operator` en la configuración. Si utiliza la versión 7.1.0 o una versión anterior, debe usar el prefijo antes de cada configuración. Por ejemplo, debe especificar `kubernetes.operator.job.autoscaler.scaling.enabled`.

A continuación se indican las opciones de configuración del escalado automático.
+ `job.autoscaler.scaling.enabled`: especifica si el escalador automático debe habilitar la ejecución del escalado de vértices. El valor predeterminado es `true`. Si deshabilita esta configuración, el escalador automático solo recopila métricas y evalúa el paralelismo sugerido para cada vértice, pero no actualiza los trabajos.
+ `job.autoscaler.stabilization.interval`: el periodo de estabilización en el que no se ejecutará ningún nuevo escalado. El tiempo predeterminado es 5 minutos.
+ `job.autoscaler.metrics.window`: el tamaño de la ventana de agregación de métricas de escalado. Cuanto más grande sea la ventana, mayor será la suavidad y la estabilidad, pero el escalador automático puede ser más lento a la hora de reaccionar ante los cambios repentinos de carga. El tiempo predeterminado es 15 minutos. Le recomendamos que experimente con un valor de entre 3 y 60 minutos.
+ `job.autoscaler.target.utilization`: el uso del vértice de destino para proporcionar un rendimiento estable del trabajo y algo de búfer para las fluctuaciones de carga. El `0.7` objetivo predeterminado es el 70% de utilization/load los vértices de trabajo.
+ `job.autoscaler.target.utilization.boundary`: el límite de uso del vértice de destino que sirve como búfer adicional para evitar el escalado inmediato en caso de fluctuaciones de carga. El valor predeterminado es `0.3`, lo que significa que se permite una desviación del 30 % con respecto a la utilización objetivo antes de activar una acción de escalado.
+ `ob.autoscaler.restart.time`: el tiempo previsto para reiniciar la aplicación. El tiempo predeterminado es 5 minutos.
+ `job.autoscaler.catch-up.duration`: el tiempo previsto para la recuperación, es decir, procesar por completo cualquier atraso una vez finalizada la operación de escalado. El tiempo predeterminado es 5 minutos. Al reducir la duración de la recuperación, el escalador automático debe reservar más capacidad adicional para las acciones de escalado.
+ `pipeline.max-parallelism`: el paralelismo máximo que puede utilizar el escalador automático. El escalador automático ignora este límite si es superior al paralelismo máximo configurado en la configuración de Flink o directamente en cada operador. El valor predeterminado es -1. Tenga en cuenta que el escalador automático calcula el paralelismo como un divisor del número máximo de paralelismo, por lo que se recomienda elegir una configuración de paralelismo máximo que tenga muchos divisores en lugar de confiar en los valores predeterminados proporcionados por Flink. Recomendamos utilizar múltiplos de 60 para esta configuración, como 120, 180, 240, 360, 720, etc.

Para ver una página de referencia de configuración más detallada, consulte [Configuración del escalador automático](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Ajuste automático de los parámetros del escalador automático
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

En esta sección se describe el comportamiento del ajuste automático de varias versiones de Amazon EMR. También trata detalles sobre las diferentes configuraciones de escalado automático.

**nota**  
Amazon EMR 7.2.0 y versiones posteriores utilizan la configuración de código abierto `job.autoscaler.restart.time-tracking.enabled` para permitir la **estimación del tiempo de reescalado**. La estimación del tiempo de reescalado tiene la misma funcionalidad que el ajuste automático de Amazon EMR, por lo que no es necesario asignar manualmente valores empíricos al tiempo de reinicio.  
Puede seguir utilizando el escalado automático de Amazon EMR, si utiliza las versiones 7.1.0 o anteriores de Amazon EMR.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 y versiones posteriores miden el tiempo real de reinicio necesario para aplicar las decisiones de escalado automático. En las versiones 7.1.0 y anteriores, tenía que usar la configuración `job.autoscaler.restart.time` para configurar manualmente el tiempo máximo de reinicio estimado. Al usar la configuración `job.autoscaler.restart.time-tracking.enabled`, solo necesita introducir una hora de reinicio para el primer escalado. Después, el operador registra el tiempo de reinicio real y lo usará para los escalados posteriores.

Para habilitar este seguimiento, utilice el siguiente comando:

```
job.autoscaler.restart.time-tracking.enabled: true
```

Las siguientes son las configuraciones relacionadas para la estimación del tiempo de reescalado.


| Configuración | Obligatorio | Predeterminado | Description (Descripción) | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | No | False | Indica si el escalador automático de Flink debe ajustar automáticamente las configuraciones a lo largo del tiempo para optimizar las decisiones de escalado. Tenga en cuenta que el escalador automático solo puede ajustar automáticamente el parámetro restart.time del escalador automático. | 
| job.autoscaler.restart.time | No | 5 m | El tiempo de reinicio esperado que utiliza Amazon EMR en EKS hasta que el operador pueda determinar el tiempo de reinicio real a partir de las escalados anteriores. | 
| job.autoscaler.restart.time-tracking.enabled | No | 15 m | El tiempo máximo de reinicio observado cuando job.autoscaler.restart.time-tracking.enabled está establecido en true. | 

El siguiente es un ejemplo de especificación de implementación que puede utilizar para probar la estimación del tiempo de reescalado:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Para simular la contrapresión, utilice la siguiente especificación de implementación.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Cargue el siguiente script de Python en el bucket de S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Para comprobar que la estimación del tiempo de reescalado funciona, asegúrese de que el registro de nivel `DEBUG` del operador Flink esté habilitado. En el siguiente ejemplo se muestra cómo actualizar el archivo `values.yaml` del gráfico de Helm. A continuación, vuelva a instalar el gráfico de Helm actualizado y vuelva a ejecutar el trabajo de Flink.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Obtenga el nombre de tu pod líder.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Ejecute el siguiente comando para obtener el tiempo de reinicio real utilizado en las evaluaciones de las métricas.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Deben aparecer registros similares a los siguientes. Tenga en cuenta que solo el primer escalado utiliza ` job.autoscaler.restart.time`. Los escalados posteriores utilizan el tiempo de reinicio observado.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

El escalador automático Flink integrado de código abierto utiliza numerosas métricas para tomar las mejores decisiones de escalado. Sin embargo, los valores predeterminados que utiliza para sus cálculos están pensados para ser aplicables a la mayoría de las cargas de trabajo y es posible que no sean óptimos para un trabajo determinado. La característica de ajuste automático añadida a la versión Amazon EMR en EKS del Flink Operator analiza las tendencias históricas observadas en métricas capturadas específicas y, en consecuencia, trata de calcular el valor más óptimo adaptado al trabajo en cuestión.


| Configuración | Obligatorio | Predeterminado | Description (Descripción) | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | No | False | Indica si el escalador automático de Flink debe ajustar automáticamente las configuraciones a lo largo del tiempo para optimizar las decisiones de escalado de los escaladores automáticos. Actualmente, el escalador automático solo puede ajustar automáticamente el parámetro restart.time del escalador automático. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | No | 3 | Indica cuántas métricas históricas de Amazon EMR en EKS guarda el escalador automático en el mapa de configuración de métricas de Amazon EMR en EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | No | 3 | Indica el número de reinicios que realiza el escalador automático antes de empezar a calcular el tiempo medio de reinicio de un trabajo determinado. | 

Para habilitar el ajuste automático, debe haber completado lo siguiente:
+ Establezca `kubernetes.operator.job.autoscaler.autotune.enable:` en `true`
+ Establezca `metrics.job.status.enable:` en `TOTAL_TIME`
+ Se sigue la configuración del [Uso del escalador automático para las aplicaciones de Flink](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) para habilitar el escalado automático.

El siguiente es un ejemplo de especificación de implementación que puede utilizar para probar el ajuste automático.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Para simular la contrapresión, utilice la siguiente especificación de implementación.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Cargue el siguiente script de Python en el bucket de S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Para comprobar que el ajustador automático funciona, utilice los siguientes comandos. Tenga en cuenta que debe utilizar la información de su propio pod líder para el Flink Operator.

Obtenga primero el nombre de su pod líder.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Una vez que tenga el nombre de su pod líder, puede ejecutar el siguiente comando.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Deben aparecer registros similares a los siguientes.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------