

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Como o Flink oferece suporte à alta disponibilidade e resiliência do trabalho
<a name="jobruns-flink-resiliency"></a>

As seções a seguir descrevem como o Flink torna os trabalhos mais confiáveis ​​e altamente disponíveis. Ele faz isso por meio de capacidades integradas, como a alta disponibilidade do Flink e vários recursos de recuperação, caso ocorram falhas.

**Topics**
+ [Uso da alta disponibilidade (HA) para operadores e aplicações do Flink](jobruns-flink-using-ha.md)
+ [Otimização dos tempos de reinicialização de trabalhos do Flink para operações de recuperação e ajuste de escala de tarefas com o Amazon EMR no EKS](jobruns-flink-restart.md)
+ [Desativação tranquila de instâncias spot com Flink no Amazon EMR no EKS](jobruns-flink-decommission.md)

# Uso da alta disponibilidade (HA) para operadores e aplicações do Flink
<a name="jobruns-flink-using-ha"></a>

Este tópico mostra como configurar a alta disponibilidade e descreve como ela funciona em alguns casos de uso diferentes. Isso inclui o uso do gerenciador de trabalhos e do Kubernetes nativo para Flink.

## Alta disponibilidade do operador do Flink
<a name="jobruns-flink-ha-operator"></a>

Habilitamos a *alta disponibilidade* do operador do Flink para que possamos fazer failover para um operador do Flink em espera a fim de minimizar o tempo de inatividade no loop de controle do operador se ocorrerem falhas. A alta disponibilidade é habilitada por padrão e o número padrão de réplicas iniciais para o operador é dois. É possível configurar o campo de réplicas em seu arquivo `values.yaml` para o chart do Helm.

Os seguintes campos são personalizáveis:
+ `replicas` (opcional, o padrão é dois): definir esse número como maior que um cria outros operadores em espera e permite uma recuperação mais rápida do trabalho.
+ `highAvailabilityEnabled` (opcional, o padrão é “true”): controla se você deseja habilitar a HA. Especificar esse parâmetro como “true” habilita o suporte à implantação multi-AZ e define os parâmetros `flink-conf.yaml` corretos.

Você pode desativar a HA para seu operador ao definir a configuração apresentada a seguir em seu arquivo `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Implantação multi-AZ**

Criamos os pods do operador em várias zonas de disponibilidade. Esta é uma restrição leve, e seus pods do operador serão programados na mesma AZ, se você não tiver recursos suficientes em uma AZ diferente.

**Determinação da réplica líder**

 Se o HA estiver habilitado, as réplicas usarão uma concessão para determinar qual delas JMs é a líder e usarão uma locação K8s para a eleição do líder. Você pode descrever a concessão e consultar o campo .Spec.Holder Identity para determinar o líder atual.

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interação entre o Flink e o S3**

**Configuração de credenciais de acesso**

Certifique-se de ter configurado o IRSA com as permissões do IAM apropriadas para acessar o bucket do S3.

**Busca por trabalhos em JARs do modo de aplicação do S3**

O operador do Flink também oferece suporte à busca de aplicações do S3 em JARs. Você acabou de fornecer a localização S3 para o JARuri em sua FlinkDeployment especificação.

Você também pode usar esse recurso para baixar outros artefatos, como PyFlink scripts. O script do Python resultante é descartado no caminho `/opt/flink/usrlib/`.

O exemplo a seguir demonstra como usar esse recurso para um PyFlink trabalho. Observe os campos jarURI e args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Conectores do S3 para Flink**

O Flink vem com dois conectores do S3 (listados abaixo). As seções a seguir debatem sobre o momento de usar cada conector.

**Ponto de verificação: conector do S3 para Presto**
+ Defina o esquema do S3 como s3p://.
+ O conector recomendado a ser usado como ponto de verificação para o s3. Para obter mais informações, consulte [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) na documentação do Apache Flink.

Exemplo de FlinkDeployment especificação:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Leitura e gravação no S3: conector Hadoop S3**
+ Defina o esquema do S3 como `s3://` ou (`s3a://`).
+ O conector recomendado para a leitura e a gravação de arquivos do S3 (somente um conector do S3 que implementa a [interface Flink Filesystem](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ Por padrão, definimos `fs.s3a.aws.credentials.provider` no arquivo `flink-conf.yaml`, que é `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Se você substituir completamente o `flink-conf` padrão e estiver interagindo com o S3, certifique-se de usar este provedor.

Exemplo de FlinkDeployment especificação

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## JobManager do Flink
<a name="jobruns-flink-ha-manager"></a>

A alta disponibilidade (HA) para implantações do Flink permite que os trabalhos continuem progredindo mesmo que um erro transitório seja encontrado e você falhe. JobManager Com a HA habilitada, os trabalhos serão reiniciados, mas a partir do último ponto de verificação com êxito. Sem o HA ativado, o Kubernetes reiniciará o seu JobManager, mas seu trabalho começará como um novo trabalho e perderá o progresso. Depois de configurar o HA, podemos pedir ao Kubernetes que armazene os metadados de HA em um armazenamento persistente para referência no caso de uma falha transitória no JobManager e, em seguida, retome nossos trabalhos a partir do último ponto de verificação bem-sucedido.

A HA é habilitada, por padrão, para os trabalhos do Flink (a contagem de réplicas é definida como dois, o que exigirá que você forneça um local de armazenamento do S3 para que os metadados de HA persistam).

**Configurações de HA**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

A seguir estão as descrições das configurações de HA apresentadas acima no Job Manager (definidas em .spec.jobManager):
+ `highAvailabilityEnabled` (opcional, o padrão é “true”): defina como `false ` se você não desejar que a HA seja habilitada e não quiser usar as configurações de HA fornecidas. Você ainda pode manipular o campo “réplicas” para configurar manualmente a HA.
+ `replicas`(opcional, o padrão é 2): definir esse número como maior que 1 cria outro modo de espera JobManagers e permite uma recuperação mais rápida do seu trabalho. Se você desabilitar a HA, deverá definir a contagem de réplicas como um ou continuará recebendo erros de validação (somente uma réplica tem suporte se a HA não estiver habilitada).
+ `storageDir` (obrigatório): por usar a contagem de réplicas como dois, por padrão, é necessário fornecer um storageDir persistente. No momento, este campo aceita somente caminhos do S3 como local de armazenamento.

**Localidade de pod**

 Se você habilitar o HA, também tentaremos colocar pods na mesma AZ, o que leva a um melhor desempenho (latência de rede reduzida ao ter pods no mesmo). AZs Este é um processo de melhor esforço, ou seja, se você não tiver recursos suficientes na AZ em que a maioria dos seus pods está programada, os pods restantes ainda serão programados, mas poderão acabar em um nó externo a esta AZ.

**Determinação da réplica líder**

Se o HA estiver habilitado, as réplicas usarão uma concessão para determinar qual delas JMs é a líder e usam um K8s Configmap como armazenamento de dados para armazenar esses metadados. Se quiser determinar o líder, você pode consultar o conteúdo do Configmap e a chave `org.apache.flink.k8s.leader.restserver` nos dados para encontrar o pod do K8s com o endereço IP. Você também pode usar os comandos bash apresentados a seguir.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Trabalho do Flink: Kubernetes nativo
<a name="jobruns-flink-ha-kubernetes"></a>

As versões 6.13.0 e superiores do Amazon EMR oferecem suporte ao Kubernetes nativo para Flink para a execução de aplicações do Flink no modo de alta disponibilidade em um cluster do Amazon EKS. 

**nota**  
Você deve ter um bucket do Amazon S3 criado para armazenar os metadados de alta disponibilidade ao enviar o trabalho do Flink. Se não desejar esse atributo, você poderá desativá-lo. Por padrão, ele é habilitado.

Para ativar o recurso de alta disponibilidade do Flink, forneça os parâmetros do Flink descritos a seguir ao [executar o comando da CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Os parâmetros são definidos abaixo do exemplo.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**: o bucket do Amazon S3 onde você deseja armazenar os metadados de alta disponibilidade para o trabalho.

  **`Dkubernetes.jobmanager.replicas`**: o número de pods do gerenciador de trabalhos a serem criados como um número inteiro maior que `1`.

  **`Dkubernetes.cluster-id`**: um ID exclusivo que identifica o cluster do Flink.

# Otimização dos tempos de reinicialização de trabalhos do Flink para operações de recuperação e ajuste de escala de tarefas com o Amazon EMR no EKS
<a name="jobruns-flink-restart"></a>

Quando uma tarefa falha ou ocorre uma operação de ajuste de escala, o Flink tenta reexecutar a tarefa com base no último ponto de verificação concluído. O processo de reinicialização pode levar um minuto ou mais para ser executado, dependendo do tamanho do estado do ponto de verificação e do número de tarefas paralelas. Durante o período de reinicialização, as tarefas de backlog podem se acumular para o trabalho. No entanto, existem algumas maneiras de o Flink otimizar a velocidade de recuperação e reinicialização dos gráficos de execução para melhorar a estabilidade do trabalho.

Esta página descreve algumas maneiras que o Flink no Amazon EMR pode usar para melhorar o tempo de reinício de trabalhos durante as operações de recuperação ou ajuste de escala de tarefas em instâncias spot. As instâncias spot são capacidades computacionais não utilizadas que estão disponíveis com desconto. Elas têm comportamentos únicos, incluindo interrupções ocasionais, por isso é importante entender como o Amazon EMR no EKS lida com elas e como o Amazon EMR no EKS realiza a desativação e a reinicialização do trabalho.

**Topics**
+ [Recuperação local de tarefas](#flink-restart-task-local)
+ [Recuperação local de tarefas por meio da montagem de volume do Amazon EBS](#flink-restart-task-local-ebs)
+ [Ponto de verificação incremental genérico baseado em logs](#flink-restart-log-check)
+ [Recuperação refinada](#flink-restart-fine-grained)
+ [Mecanismo de reinício combinado no agendador adaptável](#flink-restart-combined)

## Recuperação local de tarefas
<a name="flink-restart-task-local"></a>

**nota**  
A recuperação local de tarefas é compatível com o Flink nas versões 6.14.0 e superiores do Amazon EMR no EKS.

Com os pontos de verificação do Flink, cada tarefa produz um snapshot do próprio estado que o Flink grava em um armazenamento distribuído, como o Amazon S3. Em casos de recuperação, as tarefas restauram o estado no armazenamento distribuído. O armazenamento distribuído oferece tolerância a falhas e pode redistribuir o estado durante o reajuste de escala por ser acessível a todos os nós.

No entanto, um armazenamento distribuído remoto também tem uma desvantagem: todas as tarefas devem ler seu estado de um local remoto na rede. Isso pode resultar em tempos de recuperação longos para estados grandes durante operações de recuperação ou ajuste de escala de tarefas.

Esse problema de tempo de recuperação longo é resolvido pela *recuperação local de tarefas*. As tarefas gravam seu estado no ponto de verificação em um armazenamento secundário local para a tarefa, como em um disco local. Elas também armazenam seu estado no armazenamento principal, ou no Amazon S3, no nosso caso. Durante a recuperação, o agendador programa as tarefas no mesmo gerenciador de tarefas em que elas foram executadas anteriormente, para que possam se recuperar do armazenamento de estado local em vez de ler do armazenamento de estado remoto. Para obter mais informações, consulte [Task-Local Recovery](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) na *Documentação do Apache Flink*.

Nossos testes de benchmark com exemplos de trabalhos mostraram que o tempo de recuperação foi reduzido de minutos para alguns segundos com a recuperação local da tarefa ativada.

Para habilitar a recuperação local de tarefas, defina as configurações a seguir no seu arquivo `flink-conf.yaml`. Especifique o valor do intervalo do ponto de verificação em milissegundos.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Recuperação local de tarefas por meio da montagem de volume do Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**nota**  
A recuperação local de tarefas por meio do Amazon EBS é compatível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.

Com o Flink para Amazon EMR no EKS, você pode provisionar automaticamente os volumes do Amazon EBS nos pods do TaskManager para a recuperação local de tarefas. A montagem de sobreposição padrão vem com um volume de 10 GB, o que é suficiente para trabalhos com um estado inferior. Trabalhos com estados grandes podem habilitar a opção de *montagem automática de volume do EBS*. Os pods do TaskManager são criados e montados automaticamente durante a criação do pod e removidos durante a exclusão do pod.

Use as etapas a seguir para habilitar a montagem automática de volume do EBS para o Flink no Amazon EMR no EKS.

1. Exporte os valores das seguintes variáveis que você usará nas próximas etapas:

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Crie ou atualize um arquivo `kubeconfig` YAML para o cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Crie uma conta de serviço do IAM para o driver de interface de armazenamento de contêiner (CSI) do Amazon EBS no cluster do Amazon EKS. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Crie o driver CSI do Amazon EBS com o seguinte comando:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Crie a classe de armazenamento do Amazon EBS com o seguinte comando:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Depois, aplique a classe:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. O Helm instala o operador do Kubernetes para Flink do Amazon EMR com opções para criar uma conta de serviço. Isso cria o `emr-containers-sa-flink` para ser usado na implantação do Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Para enviar o trabalho do Flink e habilitar o provisionamento automático de volumes do EBS para a recuperação local de tarefas, defina as configurações a seguir no seu arquivo `flink-conf.yaml`. Ajuste o limite de tamanho para o tamanho do estado do trabalho. Defina `serviceAccount` como `emr-containers-sa-flink`. Especifique o valor do intervalo do ponto de verificação em milissegundos. Omita o `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Quando estiver tudo pronto para excluir o plug-in do driver CSI do Amazon EBS, use os seguintes comandos:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Ponto de verificação incremental genérico baseado em logs
<a name="flink-restart-log-check"></a>

**nota**  
O ponto de verificação incremental genérico baseado em logs é compatível com o Flink para as versões 6.14.0 e superiores do Amazon EMR no EKS.

O recurso de ponto de verificação incremental genérico baseado em logs foi adicionado ao Flink 1.16 para melhorar a velocidade dos pontos de verificação. Um intervalo de ponto de verificação mais rápido costuma resultar em uma redução do trabalho de recuperação porque menos eventos precisam ser reprocessados após a recuperação. Para obter mais informações, consulte [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) no *blog do Apache Flink*.

Com exemplos de trabalhos, nossos testes de comparação mostraram que o tempo do ponto de verificação foi reduzido de minutos para alguns segundos com o ponto de verificação incremental genérico baseado em logs.

Para habilitar os pontos de verificação incrementais genéricos baseados em logs, defina as configurações a seguir no seu arquivo `flink-conf.yaml`. Especifique o valor do intervalo do ponto de verificação em milissegundos.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Recuperação refinada
<a name="flink-restart-fine-grained"></a>

**nota**  
O suporte de recuperação refinada para o agendador padrão é compatível com o Flink nas versões 6.14.0 e superiores do Amazon EMR no EKS. O suporte de recuperação refinada no agendador adaptável está disponível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.

Quando uma tarefa falha durante a execução, o Flink redefine todo o gráfico de execução e aciona a reexecução completa com base no último ponto de verificação concluído. Isso é mais caro do que apenas reexecutar as tarefas que falharam. A recuperação refinada reinicia somente o componente conectado ao pipeline da tarefa que falhou. No exemplo a seguir, o gráfico do trabalho tem cinco vértices (de `A` a `E`). É feito o pipeline de todas as conexões entre os vértices com distribuição pontual, e o `parallelism.default` do trabalho é definido como `2`. 

```
A → B → C → D → E
```

Neste exemplo, há um total de dez tarefas em execução. O primeiro pipeline (de `a1` a `e1`) é executado em um TaskManager (`TM1`) e o segundo pipeline (de `a2` a `e2`) é executado em outro TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Há dois componentes conectados por pipeline: `a1 → e1` e `a2 → e2`. Se `TM1` ou `TM2` falhar, a falha afetará somente as cinco tarefas no pipeline em que TaskManager estava em execução. A estratégia de reinicialização só inicia o componente do pipeline afetado. 

A recuperação refinada funciona somente com trabalhos perfeitamente paralelos do Flink. Não é compatível com operações de `keyBy()` ou `redistribute()`. Para obter mais informações, consulte [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) no projeto Jira *Flink Improvement Proposal*.

Para habilitar a recuperação refinada, defina as configurações a seguir no seu arquivo `flink-conf.yaml`.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Mecanismo de reinício combinado no agendador adaptável
<a name="flink-restart-combined"></a>

**nota**  
O mecanismo de reinicialização combinado no agendador adaptável é compatível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.

O agendador adaptável pode ajustar o paralelismo do trabalho com base nos slots disponíveis. Ele reduz automaticamente o paralelismo se não houver slots suficientes disponíveis para atender ao paralelismo do trabalho configurado. Se novos slots ficarem disponíveis, o trabalho aumentará a escala verticalmente mais uma vez para o paralelismo do trabalho configurado. Um agendador adaptável evita o tempo de inatividade no trabalho quando não há recursos suficientes disponíveis. Esse é o agendador compatível com o escalador automático do Flink. Recomendamos o agendador adaptável com o Flink no Amazon EMR por esses motivos. No entanto, os agendadores adaptáveis podem fazer várias reinicializações em um curto período; uma reinicialização para cada novo recurso adicionado. Isso pode levar a uma queda na performance do trabalho.

Com o Amazon EMR 6.15.0 e versões superiores, o Flink tem um mecanismo de reinício combinado no agendador adaptável que abre uma janela de reinicialização quando o primeiro recurso é adicionado e aguarda pelo intervalo configurado da janela do padrão de um minuto. Ele executa uma única reinicialização quando há recursos suficientes disponíveis para executar o trabalho com o paralelismo configurado ou quando o intervalo expira.

Com os exemplos de trabalhos, nossos testes de comparação mostraram que esse recurso processa 10% a mais dos registros do que o comportamento padrão ao usar o agendador adaptável e o escalador automático do Flink.

Para habilitar o mecanismo de reinício combinado, defina as configurações a seguir no seu arquivo `flink-conf.yaml`.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Desativação tranquila de instâncias spot com Flink no Amazon EMR no EKS
<a name="jobruns-flink-decommission"></a>

O Flink com o Amazon EMR no EKS pode melhorar o tempo de reinicialização de trabalhos durante as operações de recuperação e ajuste de escala de tarefas.

## Visão geral do
<a name="jobruns-flink-decommission-overview"></a>

As versões 6.15.0 e superiores do Amazon EMR no EKS oferecem suporte à desativação tranquila de gerenciadores de tarefas em instâncias spot no Amazon EMR no EKS com o Flink para Apache. Como parte desse recurso, o Amazon EMR no EKS com o Flink fornece as funcionalidades a seguir.
+ Just-in-time ponto **de verificação** — As tarefas de streaming do Flink podem responder à interrupção da Instância Spot, realizar o ponto de verificação just-in-time (JIT) das tarefas em execução e impedir o agendamento de tarefas adicionais nessas Instâncias Spot. O ponto de verificação de JIT é compatível com agendadores padrão e adaptáveis.
+ **Mecanismo de reinicialização combinado**: um mecanismo de reinicialização combinado faz a melhor tentativa de reiniciar o trabalho depois que ele atinge o paralelismo de recursos de destino ou o final da janela configurada atual. Isso também evita reinicializações consecutivas de trabalhos que podem ser causadas por vários encerramentos de instâncias spot. O mecanismo de reinicialização combinado está disponível somente com o agendador adaptável.

Esses recursos fornecem os seguintes benefícios:
+ Você pode aproveitar as instâncias spot para executar gerenciadores de tarefas e reduzir os gastos com clusters.
+ O aprimoramento de liveness do gerenciador de tarefas da instância spot resulta em maior resiliência e agendamento de trabalhos mais eficiente.
+ Seus trabalhos do Flink terão mais tempo de atividade porque haverá menos reinicializações após o encerramento da instância spot.

## Como funciona a desativação normal
<a name="jobruns-flink-decommission-howitworks"></a>

Considere o seguinte exemplo: você provisiona um cluster do Amazon EMR no EKS executando o Flink para Apache e especifica nós sob demanda para o gerenciador de trabalhos e nós de instância spot para o gerenciador de tarefas. Dois minutos antes do encerramento, o gerenciador de tarefas recebe um aviso de interrupção.

Nesse cenário, o gerenciador de trabalhos manipularia o sinal de interrupção da instância spot, bloquearia o agendamento de tarefas adicionais na instância spot e iniciaria o ponto de verificação de JIT do trabalho de streaming.

Em seguida, o gerenciador de trabalhos reiniciaria o gráfico do trabalho somente depois que houvesse disponibilidade suficiente de novos recursos para satisfazer o paralelismo atual de trabalhos na janela atual do intervalo de reinicialização. O intervalo da janela de reinicialização é decidido com base na duração da substituição da instância spot, na criação de novos pods do gerenciador de tarefas e no registro no gerenciador de trabalhos.

## Pré-requisitos
<a name="jobruns-flink-decommission-prereqs"></a>

Para usar a desativação normal, crie e execute um trabalho de streaming em um cluster do Amazon EMR no EKS executando o Apache Flink. Habilite o agendador adaptável e os gerenciadores de tarefas agendados em, pelo menos, uma instância spot, conforme mostrado no exemplo a seguir. Você deve usar nós sob demanda para o gerenciador de trabalhos e pode usar nós sob demanda para os gerenciadores de tarefas, desde que também haja pelo menos uma instância spot.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuração
<a name="jobruns-flink-decommission-config"></a>

Esta seção aborda a maioria das configurações que você pode especificar para as suas necessidades de desativação. 


| Chave | Description | Valor padrão  | Valores aceitos | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Habilita a desativação tranquila do gerenciador de tarefas.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Habilita o mecanismo de reinicialização combinado no agendador adaptável.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  O intervalo combinado da janela de reinicialização para efetuar reinicializações mescladas do trabalho. Um número inteiro sem uma unidade é interpretado como milissegundos.  |  1m  |  Exemplos: 30, 60s, 3m, 1h  | 