

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Uso do Autoscaler para aplicações do Flink
<a name="jobruns-flink-autoscaler"></a>

O Autoscaler do operador pode ajudar a aliviar a contrapressão ao coletar métricas de trabalhos do Flink e ajustar automaticamente o paralelismo em nível de vértice do trabalho. Confira o seguinte exemplo de como a configuração pode se parecer:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Essa configuração usa valores padrão para a versão mais recente do Amazon EMR. Se você usar outras versões, é possível que tenha valores diferentes.

**nota**  
A partir do Amazon EMR 7.2.0, você não precisa incluir o prefixo `kubernetes.operator` na sua configuração. Se você usa a versão 7.1.0 ou inferior, deve usar o prefixo antes de cada configuração. Por exemplo, você deve especificar `kubernetes.operator.job.autoscaler.scaling.enabled`.

A seguir, são apresentadas as opções de configuração para o Autoscaler.
+ `job.autoscaler.scaling.enabled`: especifica se a execução do ajuste de escala de vértices deve ser habilitada pelo escalador automático. O padrão é `true`. Se você desabilitar essa configuração, o escalador automático coletará apenas métricas e avaliará o paralelismo sugerido para cada vértice, mas não atualizará os trabalhos.
+ `job.autoscaler.stabilization.interval`: o período de estabilização no qual nenhuma nova escalabilidade será executada. O padrão é de cinco minutos.
+ `job.autoscaler.metrics.window`: o tamanho da janela de agregação de métricas de escalabilidade. Quanto mais ampla for a janela, mais harmoniosa e estável ela será, mas o Autoscaler pode demorar mais para reagir a alterações repentinas de carga. O padrão é de 15 minutos. Recomendamos que você experimente usando um valor entre 3 e 60 minutos.
+ `job.autoscaler.target.utilization`: a utilização desejada para o vértice para fornecer uma performance de trabalho estável e algum buffer para flutuações de carga. O padrão é `0.7` atingir 70% utilization/load para os vértices do trabalho.
+ `job.autoscaler.target.utilization.boundary`: o limite da utilização desejada para o vértice, que serve como buffer extra para evitar uma escalabilidade imediata em flutuações de carga. O padrão é `0.3`, o que significa que é permitido um desvio de 30% da utilização desejada antes do acionamento de uma ação de ajuste de escala.
+ `ob.autoscaler.restart.time`: o tempo esperado para reiniciar a aplicação. O padrão é de cinco minutos.
+ `job.autoscaler.catch-up.duration`: o tempo esperado para a recuperação, ou seja, o processamento total de qualquer backlog após a conclusão de uma operação de escalabilidade. O padrão é de cinco minutos. Ao reduzir a duração da recuperação, é necessário que o Autoscaler reserve uma capacidade extra para as ações de escalabilidade.
+ `pipeline.max-parallelism`: o paralelismo máximo que o Autoscaler pode usar. O Autoscaler ignora esse limite se ele for maior que o paralelismo máximo configurado na configuração do Flink ou diretamente em cada operador. O padrão é -1. Observe que o Autoscaler calcula o paralelismo como um divisor do número de paralelismo máximo, portanto, é recomendado escolher configurações de paralelismo máximo com muitos divisores em vez de confiar nos padrões fornecidos pelo Flink. Recomendamos usar múltiplos de 60 para esta configuração, por exemplo, 120, 180, 240, 360, 720 etc.

Para obter uma página de referência de configurações mais detalhada, consulte [Autoscaler Configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Ajuste automático dos parâmetros do escalador automático
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

Esta seção descreve o comportamento de ajuste automático para várias versões do Amazon EMR. Ela também detalha as diferentes configurações de ajuste de escala automático.

**nota**  
O Amazon EMR 7.2.0 e versões posteriores usam a configuração de código aberto `job.autoscaler.restart.time-tracking.enabled` para habilitar a **estimativa do tempo de redimensionamento**. A estimativa do tempo de redimensionamento tem a mesma funcionalidade do ajuste automático do Amazon EMR, então você não precisa atribuir manualmente valores empíricos ao horário de reinicialização.  
Você ainda pode usar o ajuste automático do Amazon EMR se estiver utilizando o Amazon EMR 7.1.0 ou inferior.

------
#### [ 7.2.0 and higher ]

O Amazon EMR 7.2.0 e versões posteriores medem o tempo real de reinicialização necessário para aplicar decisões de ajuste de escala automático. Nas versões 7.1.0 e inferiores, você precisava usar a configuração `job.autoscaler.restart.time` para configurar manualmente o tempo máximo estimado de reinicialização. Ao usar a configuração `job.autoscaler.restart.time-tracking.enabled`, você só precisa inserir um horário de reinicialização para o primeiro ajuste de escala. Depois, o operador registra o tempo real de reinicialização e o usará para ajustes de escala subsequentes.

Para habilitar esse rastreamento, use o seguinte comando:

```
job.autoscaler.restart.time-tracking.enabled: true
```

A seguir estão as configurações relacionadas para a estimativa do tempo de redimensionamento.


| Configuração | Obrigatório | Padrão | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | Não | Falso | Indica se o escalador automático do Flink deve ajustar automaticamente as configurações ao longo do tempo para otimizar as decisões de escalabilidade. Observe que o escalador automático só pode ajustar automaticamente o parâmetro restart.time do escalador automático. | 
| job.autoscaler.restart.time | Não | 5 minutos | O tempo de reinicialização esperado que o Amazon EMR no EKS usa até que o operador possa determinar o tempo real de reinicialização com base nos ajustes de escala anteriores. | 
| job.autoscaler.restart.time-tracking.limit | Não | 15 minutos | O tempo máximo de reinicialização observado quando job.autoscaler.restart.time-tracking.enabled está definido como true. | 

Confira este exemplo de especificação de implantação que você pode usar para testar a estimativa do tempo de redimensionamento:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Para simular a contrapressão, use a especificação de implantação a seguir.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Faça upload do script Python a seguir no bucket do S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Para verificar se a estimativa do tempo de redimensionamento está funcionando, confira se o registro em log em nível de `DEBUG` do operador do Flink está habilitado. O exemplo abaixo demonstra como atualizar o arquivo `values.yaml` do chart do Helm. Em seguida, reinstale o chart do Helm atualizado e execute o trabalho do Flink novamente.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Obtenha o nome do pod líder.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Execute o comando a seguir para obter o tempo real de reinicialização usado nas avaliações de métricas.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Você deve ver logs semelhantes aos mostrados a seguir. Observe que somente o primeiro ajuste de escala usa ` job.autoscaler.restart.time`. Os ajustes de escala subsequentes usam o tempo de reinicialização observado.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

O escalador automático integrado do Flink e de código aberto usa várias métricas para tomar as melhores decisões de ajuste de escala. No entanto, os valores padrão que ele usa para seus cálculos devem ser aplicáveis à maioria das workloads e podem não ser ideais para um determinado trabalho. O recurso de ajuste automático adicionado à versão do Amazon EMR no EKS do operador do Flink analisa as tendências históricas observadas em métricas específicas capturadas e, em seguida, tenta calcular o valor mais adequado para o trabalho em questão.


| Configuração | Obrigatório | Padrão | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | Não | Falso | Indica se o escalador automático do Flink deve ajustar automaticamente as configurações ao longo do tempo para otimizar as decisões de escalabilidade do escalador automático. Atualmente, o escalador automático só pode ajustar automaticamente seu parâmetro restart.time. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | Não | 3 | Indica quantas métricas históricas do Amazon EMR no EKS o escalador automático mantém no mapa de configuração de métricas do Amazon EMR no EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | Não | 3 | Indica quantas reinicializações o escalador automático executa antes de começar a calcular o tempo médio de reinicialização de um determinado trabalho. | 

Para habilitar o ajuste automático, você deve concluir as seguintes etapas:
+ Defina `kubernetes.operator.job.autoscaler.autotune.enable:` como `true`
+ Defina `metrics.job.status.enable:` como `TOTAL_TIME`
+ Seguir a configuração em [Using Autoscaler for Flink applications](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) para habilitar o ajuste de escala automático.

Confira a seguir um exemplo de especificação de implantação que você pode usar para experimentar o ajuste automático.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Para simular a contrapressão, use a especificação de implantação a seguir.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Faça upload do script Python a seguir no bucket do S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Para verificar se o escalador automático está funcionando, use os comandos a seguir. Observe que você deve usar as informações do seu próprio pod líder no operador do Flink.

Primeiro, obtenha o nome do seu pod líder.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Depois de ter o nome do pod líder, será possível executar o comando a seguir.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Você deve ver logs semelhantes aos mostrados a seguir.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------