

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Execução de trabalhos do Flink com o Amazon EMR no EKS
<a name="run-flink-jobs"></a>

As versões 6.13.0 e superiores do Amazon EMR oferecem suporte ao Amazon EMR no EKS com o Apache Flink ou ao operador do Kubernetes para Flink, como um modelo de envio de trabalhos para o Amazon EMR no EKS. Com o Amazon EMR no EKS com o Apache Flink, você pode implantar e gerenciar aplicações do Flink com o runtime da versão do Amazon EMR em seus próprios clusters do Amazon EKS. Depois de implantar o operador do Kubernetes para Flink em seu cluster do Amazon EKS, você poderá enviar aplicações do Flink diretamente com o operador. O operador gerencia o ciclo de vida das aplicações do Flink.

**Topics**
+ [Configuração e uso do operador do Kubernetes do Flink](jobruns-flink-kubernetes-operator.md)
+ [Uso do Kubernetes nativo do Flink](jobruns-flink-native-kubernetes.md)
+ [Personalização de imagens do Docker para o Flink e o FluentD](jobruns-flink-docker-flink-fluentd.md)
+ [Monitoramento do operador do Kubernetes para Flink e dos trabalhos do Flink](jobruns-flink-monitoring.md)
+ [Como o Flink oferece suporte à alta disponibilidade e resiliência do trabalho](jobruns-flink-resiliency.md)
+ [Uso do Autoscaler para aplicações do Flink](jobruns-flink-autoscaler.md)
+ [Manutenção e solução de problemas para trabalhos do Flink no Amazon EMR no EKS](jobruns-flink-troubleshooting.md)
+ [Versões compatíveis para Amazon EMR no EKS com Apache Flink](jobruns-flink-security-release-versions.md)

# Configuração e uso do operador do Kubernetes do Flink
<a name="jobruns-flink-kubernetes-operator"></a>

As páginas apresentadas a seguir descrevem como configurar e usar o operador do Kubernetes para Flink com a finalidade de executar trabalhos do Flink com o Amazon EMR no EKS. Os tópicos disponíveis incluem pré-requisitos obrigatórios, como configurar seu ambiente e executar uma aplicação do Flink no Amazon EMR no EKS.

**Topics**
+ [Configuração do operador do Kubernetes para Flink para o Amazon EMR no EKS](jobruns-flink-kubernetes-operator-setup.md)
+ [Desinstalação do operador do Kubernetes do Flink no Amazon EMR no EKS](jobruns-flink-kubernetes-operator-getting-started.md)
+ [Execução de uma aplicação do Flink](jobruns-flink-kubernetes-operator-run-application.md)
+ [Permissões do perfil de segurança para executar uma aplicação do Flink](jobruns-flink-kubernetes-security.md)
+ [Desinstalação do operador do Kubernetes para Flink para o Amazon EMR no EKS](jobruns-flink-kubernetes-operator-uninstall.md)

# Configuração do operador do Kubernetes para Flink para o Amazon EMR no EKS
<a name="jobruns-flink-kubernetes-operator-setup"></a>

Conclua as tarefas apresentadas a seguir para se preparar antes de instalar o operador do Kubernetes para Flink no Amazon EKS. Se você já se inscreveu na Amazon Web Services (AWS) e usou o Amazon EKS, está com quase tudo pronto para usar o Amazon EMR no EKS. Conclua as tarefas apresentadas a seguir para se preparar para usar o operador do Flink no Amazon EKS. Se você já completou algum dos pré-requisitos, pode ignorá-los e passar para os próximos.
+ **[Instale ou atualize para a versão mais recente do AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** — Se você já instalou o AWS CLI, confirme se você tem a versão mais recente.
+ **[Instale kubectl e eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)**: o eksctl é uma ferramenta de linha de comando usada para se comunicar com o Amazon EKS.
+ **[Instale o Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)**: o gerenciador de pacotes Helm para o Kubernetes ajuda a instalar e gerenciar aplicações em seu cluster do Kubernetes. 
+ **[Comece a usar o Amazon EKS (eksctl):](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) **siga as etapas para criar um cluster do Kubernetes com nós no Amazon EKS.
+ **[Escolha um rótulo de lançamento do Amazon EMR](jobruns-flink-security-release-versions.md) (versão 6.13.0 ou superiores)**: o operador do Kubernetes do Flink é compatível com as versões 6.13.0 e superiores do Amazon EMR.
+ **[Habilite perfis do IAM para contas de serviço (IRSA) no cluster do Amazon EKS](setting-up-enable-IAM.md)**.
+ **[Crie um perfil de execução de trabalho](creating-job-execution-role.md)**.
+ **[Atualize a política de confiança do perfil de execução de trabalho](setting-up-trust-policy.md)**.
+ Crie um perfil de execução de operador. Esta etapa é opcional. É possível usar o mesmo perfil para trabalhos e operadores do Flink. Se desejar ter um perfil do IAM diferente para o operador, você poderá criar um perfil separado.
+ Atualize a política de confiança do perfil de execução do operador. Você deve adicionar explicitamente uma entrada de política de confiança para os perfis que deseja usar para a conta de serviço do operador do Kubernetes para Flink no Amazon EMR. Você pode seguir este formato de exemplo:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# Desinstalação do operador do Kubernetes do Flink no Amazon EMR no EKS
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

Este tópico ajuda você a começar a usar o operador do Kubernetes do Flink no Amazon EKS ao preparar uma implantação do Flink.

## Como instalar o operador do Kubernetes
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

Use as etapas a seguir para instalar o operador do Kubernetes para Apache Flink.

1. Caso ainda não tenha feito, conclua as etapas em [Configuração do operador do Kubernetes para Flink para o Amazon EMR no EKS](jobruns-flink-kubernetes-operator-setup.md).

1. Instale o *cert-manager* (uma vez por cluster do Amazon EKS) para permitir a adição do componente webhook.

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. Instale o chart do Helm.

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   Resultado do exemplo:

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. Aguarde a conclusão da implantação e verifique a instalação do chart.

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. Você deverá visualizar a mensagem a seguir quando a implantação for concluída.

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. Use o comando apresentado a seguir para visualizar o operador implantado.

   ```
   helm list --namespace $NAMESPACE
   ```

   O exemplo a seguir mostra um exemplo de saída, em que a versão da aplicação `x.y.z-amzn-n` corresponderia à versão do operador do Flink para seu Amazon EMR na versão EKS. Para obter mais informações, consulte [Versões compatíveis para Amazon EMR no EKS com Apache Flink](jobruns-flink-security-release-versions.md).

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### Faça upgrade do operador Kubernetes
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

Para fazer upgrade para a versão do operador Flink, siga estas etapas:

1. Desinstale o antigo `flink-kubernetes-operator`: `helm uninstall flink-kubernetes-operator -n <NAMESPACE>`.

1. Exclua o CRD (já que o helm não exclui automaticamente o CRD antigo): `kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`.

1. Reinstale `flink-kubernetes-operator` com a versão mais recente.

# Execução de uma aplicação do Flink
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Com as versões 6.13.0 e superiores do Amazon EMR, você pode executar uma aplicação do Flink usando o operador do Kubernetes para Flink no modo Aplicação do Amazon EMR no EKS. Com as versões 6.15.0 e superiores do Amazon EMR, você também pode executar uma aplicação do Flink no modo Sessão. Esta página descreve ambos os métodos para executar uma aplicação do Flink com o Amazon EMR no EKS.

**Topics**

**nota**  
Você deve ter um bucket do Amazon S3 criado para armazenar os metadados de alta disponibilidade ao enviar o trabalho do Flink. Se não desejar esse atributo, você poderá desativá-lo. Por padrão, ele é habilitado.

**Pré-requisito**: antes de executar uma aplicação do Flink com o operador do Kubernetes para Flink, conclua as etapas em [Configuração do operador do Kubernetes para Flink para o Amazon EMR no EKS](jobruns-flink-kubernetes-operator-setup.md) e [Como instalar o operador do Kubernetes](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator).

------
#### [ Application mode ]

Com as versões 6.13.0 e superiores do Amazon EMR, você pode executar uma aplicação do Flink usando o operador do Kubernetes para Flink no modo Aplicação do Amazon EMR no EKS.

1. Crie um arquivo `basic-example-app-cluster.yaml` de definição `FlinkDeployment` como no exemplo a seguir. Se você ativou e usou um dos [opt-in Regiões da AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), certifique-se de descomentar e configurar a configuração. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Envie a implantação do Flink com o comando apresentado a seguir. Isso também criará um objeto `FlinkDeployment` chamado `basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Acesse a interface do usuário do Flink.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Abra `localhost:8081` para visualizar os trabalhos do Flink localmente.

1. Limpe o trabalho. Lembre-se de limpar os artefatos do S3 que foram criados para esse trabalho, como pontos de verificação, alta disponibilidade, metadados de pontos de salvamento e registros. CloudWatch

Para obter mais informações sobre o envio de inscrições para o Flink por meio do operador Flink Kubernetes, consulte exemplos de operadores do [Flink Kubernetes](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) na pasta em. `apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

Com as versões 6.15.0 e superiores do Amazon EMR, você pode executar uma aplicação do Flink usando o operador do Kubernetes para Flink no modo Sessão no Amazon EMR no EKS.

1. Crie um arquivo de definição `FlinkDeployment` chamado `basic-example-app-cluster.yaml` como no exemplo a seguir. Se você ativou e usou um dos [opt-in Regiões da AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), certifique-se de descomentar e configurar a configuração. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Envie a implantação do Flink com o comando apresentado a seguir. Isso também criará um objeto `FlinkDeployment` chamado `basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Use o seguinte comando para confirmar se o cluster da sessão `LIFECYCLE` é `STABLE`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   A saída deve ser semelhante ao seguinte exemplo:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Crie um arquivo `basic-session-job.yaml` de recursos de definição personalizado do `FlinkSessionJob` com o seguinte conteúdo de exemplo:

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Envie o trabalho de sessão do Flink com o comando apresentado a seguir. Isso criará um objeto do `FlinkSessionJob` `basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Use o seguinte comando para confirmar se o cluster da sessão `LIFECYCLE` é `STABLE` e o `JOB STATUS` é `RUNNING`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   A saída deve ser semelhante ao seguinte exemplo:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Acesse a interface do usuário do Flink.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Abra `localhost:8081` para visualizar os trabalhos do Flink localmente.

1. Limpe o trabalho. Lembre-se de limpar os artefatos do S3 que foram criados para esse trabalho, como pontos de verificação, alta disponibilidade, metadados de pontos de salvamento e registros. CloudWatch

------

# Permissões do perfil de segurança para executar uma aplicação do Flink
<a name="jobruns-flink-kubernetes-security"></a>

Este tópico descreve os perfis de segurança para implantar e executar uma aplicação do Flink. Há dois perfis necessários para gerenciar uma implantação e criar e gerenciar trabalhos: o perfil de operador e o perfil de trabalho. Este tópico os apresenta e lista as permissões.

## Controle de acesso baseado em perfis
<a name="jobruns-flink-kubernetes-security-rbac"></a>

Para implantar o operador e executar os trabalhos do Flink, é necessário criar dois perfis do Kubernetes: um perfil de operador e um perfil de trabalho. Por padrão, o Amazon EMR cria os dois perfis ao instalar o operador.

## Perfil de operador
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

Usamos a função de operador para gerenciar `flinkdeployments` a criação e o JobManager gerenciamento de cada tarefa do Flink e de outros recursos, como serviços.

O nome padrão do perfil de operador é `emr-containers-sa-flink-operator` e ele requer as permissões apresentadas a seguir.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## Perfil de trabalho
<a name="jobruns-flink-security-job-role"></a>

 JobManager Ele usa a função de trabalho para criar e gerenciar TaskManagers e ConfigMaps para cada trabalho.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# Desinstalação do operador do Kubernetes para Flink para o Amazon EMR no EKS
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

Siga estas etapas para desinstalar o operador do Kubernetes para Flink.

1. Exclua o operador.

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. Exclua os recursos do Kubernetes que o Helm não desinstala.

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (Opcional) Exclua o cert-manager.

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# Uso do Kubernetes nativo do Flink
<a name="jobruns-flink-native-kubernetes"></a>

As versões 6.13.0 e superiores do Amazon EMR oferecem suporte ao Kubernetes nativo para Flink como uma ferramenta de linha de comando que você pode usar para enviar e executar aplicações do Flink para um cluster do Amazon EMR no EKS.

**Topics**
+ [Configuração do Kubernetes nativo para Flink para o Amazon EMR no EKS](jobruns-flink-native-kubernetes-setup.md)
+ [Conceitos básicos do Kubernetes nativo para Flink para o Amazon EMR no EKS](jobruns-flink-native-kubernetes-getting-started.md)
+ [Requisitos de segurança da conta JobManager de serviço Flink para Kubernetes nativo](jobruns-flink-native-kubernetes-security-requirements.md)

# Configuração do Kubernetes nativo para Flink para o Amazon EMR no EKS
<a name="jobruns-flink-native-kubernetes-setup"></a>

Conclua as tarefas apresentadas a seguir para se preparar antes de executar uma aplicação com a CLI do Flink no Amazon EMR no EKS. Se você já se inscreveu na Amazon Web Services (AWS) e usou o Amazon EKS, está com quase tudo pronto para usar o Amazon EMR no EKS. Se você já completou algum dos pré-requisitos, pode ignorá-los e passar para os próximos.
+ **[Instale ou atualize para a versão mais recente do AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** — Se você já instalou o AWS CLI, confirme se você tem a versão mais recente.
+ **[Comece a usar o Amazon EKS (eksctl):](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) **siga as etapas para criar um cluster do Kubernetes com nós no Amazon EKS.
+ **[Selecione um URI de imagem base do Amazon EMR](docker-custom-images-tag.md) (versão 6.13.0 ou superiores)**: o comando do Kubernetes para Flink é compatível com as versões 6.13.0 e superiores do Amazon EMR.
+ Confirme se a conta JobManager de serviço tem as permissões apropriadas para criar e assistir TaskManager pods. Para obter mais informações, consulte [Requisitos de segurança da conta JobManager de serviço do Flink para o Kubernetes nativo](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html).
+ Configure seu [perfil de credenciais locais da AWS](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).
+ [Crie ou atualize um arquivo kubeconfig para um cluster do Amazon EKS](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html) no qual você deseja executar as aplicações do Flink.

# Conceitos básicos do Kubernetes nativo para Flink para o Amazon EMR no EKS
<a name="jobruns-flink-native-kubernetes-getting-started"></a>

Essas etapas mostram como configurar, definir uma conta de serviço e executar uma aplicação do Flink. O Kubernetes nativo para Flink é usado na implantação do Flink em um cluster do Kubernetes em execução.

## Como configurar e executar uma aplicação do Flink
<a name="jobruns-flink-native-kubernetes-getting-started-run-application"></a>

As versões 6.13.0 e posteriores do Amazon EMR oferecem suporte ao Kubernetes nativo para Flink para a execução de aplicações do Flink em um cluster do Amazon EKS. Para executar uma aplicação do Flink, siga estas etapas:

1. Antes de executar uma aplicação do Flink com o comando do Kubernetes nativo para Flink, conclua as etapas em [Configuração do Kubernetes nativo para Flink para o Amazon EMR no EKS](jobruns-flink-native-kubernetes-setup.md).

1. [Download e instalação do Flink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation).

1. Defina os valores para as variáveis ​​de ambiente a seguir.

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-Região da AWS-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. Crie uma conta de serviço para gerenciar os recursos do Kubernetes.

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. Execute o comando `run-application` da CLI.

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. Examine os recursos do Kubernetes criados.

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. Encaminhe a porta para 8081.

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. Acesse localmente a interface do usuário do Flink.  
![\[Acesse a interface do usuário do Flink.\]](http://docs.aws.amazon.com/pt_br/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. Exclua a aplicação do Flink.

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

Para obter mais informações sobre o envio de aplicações para o Flink, consulte [Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) na documentação do Apache Flink.

# Requisitos de segurança da conta JobManager de serviço Flink para Kubernetes nativo
<a name="jobruns-flink-native-kubernetes-security-requirements"></a>

O JobManager pod Flink usa uma conta de serviço do Kubernetes para acessar o servidor da API Kubernetes para criar e assistir pods. TaskManager A conta JobManager de serviço deve ter as permissões apropriadas para create/delete TaskManager os pods e permitir que o TaskManager líder do ConfigMaps to watch recupere o endereço de JobManager e ResourceManager no seu cluster.

As regras apresentadas a seguir se aplicam a esta conta de serviço.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# Personalização de imagens do Docker para o Flink e o FluentD
<a name="jobruns-flink-docker-flink-fluentd"></a>

Siga as etapas a seguir para personalizar as imagens do Docker para o Amazon EMR no EKS com imagens do Apache Flink ou do FluentD. Isso inclui orientação técnica para obter uma imagem base, personalizá-la, publicá-la e enviar uma workload.

**Topics**
+ [Pré-requisitos](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [Etapa 1: recuperar uma imagem base do Amazon Elastic Container Registry](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [Etapa 2: personalizar uma imagem base](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [Etapa 3: publicar uma imagem personalizada](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [Etapa 4: enviar uma workload do Flink no Amazon EMR usando uma imagem personalizada](#jobruns-flink-docker-flink-fluentd-submit-workload)

## Pré-requisitos
<a name="jobruns-flink-docker-flink-fluentd-prereqs"></a>

Antes de personalizar a imagem do Docker, certifique-se de ter atendido aos seguintes pré-requisitos:
+ Concluir as etapas em [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html).
+ Instalar o Docker em seu ambiente. Para obter mais informações, consulte [Get Docker](https://docs.docker.com/get-docker/).

## Etapa 1: recuperar uma imagem base do Amazon Elastic Container Registry
<a name="jobruns-flink-docker-flink-fluentd-retrieve-base"></a>

A imagem base contém o runtime do Amazon EMR e os conectores necessários ​​para acessar outros Serviços da AWS. Se estiver usando o Amazon EMR no EKS com a versão 6.14.0 ou posterior do Flink, você pode obter as imagens base na galeria pública do Amazon ECR. Navegue pela galeria para encontrar o link da imagem e extraia-a para seu Workspace local. Por exemplo, para a versão 6.14.0 do Amazon EMR, o comando `docker pull` a seguir retorna a imagem base padrão mais recente. Substitua `emr-6.14.0:latest` pela versão de lançamento desejada.

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

Estes são os links para a imagem da galeria do Flink e a imagem da galeria do FluentD:
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fluente/emr-6.14.0 (](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## Etapa 2: personalizar uma imagem base
<a name="jobruns-flink-docker-flink-fluentd-customize-image"></a>

As etapas a seguir descrevem como personalizar a imagem base extraída do Amazon ECR.

1. Crie um novo `Dockerfile` em seu Workspace local.

1. Edite o `Dockerfile` e adicione o conteúdo a seguir. Este `Dockerfile` usa a imagem de contêiner que você extraiu de `public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest`.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   Use a configuração a seguir se estiver utilizando o `Fluentd`.

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. Adicione comandos no `Dockerfile` para personalizar a imagem base. O comando a seguir demonstra como instalar bibliotecas Python.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. No mesmo diretório em que o `DockerFile` foi criado, execute o comando a seguir para criar a imagem do Docker. O campo que você fornece após o sinalizador `-t` é o nome personalizado da imagem.

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Etapa 3: publicar uma imagem personalizada
<a name="jobruns-flink-docker-flink-fluentd-publish-image"></a>

Agora você pode publicar a nova imagem do Docker no Amazon ECR Registry.

1. Execute o comando a seguir para criar um repositório do Amazon ECR e armazenar a imagem do Docker. Forneça um nome para o repositório, como `emr_custom_repo.`. Para obter mais informações, consulte [Create a repository](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository) no Guia do usuário do Amazon Elastic Container Registry.

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. Execute o comando apresentado a seguir para realizar a autenticação em seu registro padrão. Para obter mais informações, consulte [Authenticate to your default registry](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry) no Guia do usuário do Amazon Elastic Container Registry.

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. Envie a imagem. Para obter mais informações, consulte [Push an image to Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image) no Guia do usuário do Amazon Elastic Container Registry.

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Etapa 4: enviar uma workload do Flink no Amazon EMR usando uma imagem personalizada
<a name="jobruns-flink-docker-flink-fluentd-submit-workload"></a>

Faça as alterações a seguir na especificação `FlinkDeployment` para usar uma imagem personalizada. Para fazer isso, insira sua própria imagem na linha `spec.image` da especificação de implantação.

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

Para usar uma imagem personalizada no trabalho do FluentD, insira sua própria imagem na linha `monitoringConfiguration.image` da especificação de implantação.

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# Monitoramento do operador do Kubernetes para Flink e dos trabalhos do Flink
<a name="jobruns-flink-monitoring"></a>

Esta seção descreve diversas maneiras para monitorar trabalhos do Flink com o Amazon EMR no EKS. Isso inclui a integração do Flink com o Amazon Managed Service for Prometheus, o uso do *Flink Web Dashboard*, que fornece o status e as métricas do trabalho, ou o uso de uma configuração de monitoramento para enviar dados de log ao Amazon S3 e ao Amazon CloudWatch.

**Topics**
+ [Uso do Amazon Managed Service para Prometheus no monitoramento de trabalhos do Flink](jobruns-flink-monitoring-prometheus.md)
+ [Uso da interface do usuário do Flink para monitorar trabalhos do Flink](jobruns-flink-monitoring-ui.md)
+ [Uso da configuração de monitoramento para monitorar o operador do Kubernetes do Flink e os trabalhos do Flink](jobruns-flink-monitoring-configuration.md)

# Uso do Amazon Managed Service para Prometheus no monitoramento de trabalhos do Flink
<a name="jobruns-flink-monitoring-prometheus"></a>

Você pode integrar o Apache Flink ao Amazon Managed Service for Prometheus (portal de gerenciamento). O Amazon Managed Service for Prometheus oferece suporte à ingestão de métricas de servidores do Amazon Managed Service for Prometheus em clusters em execução no Amazon EKS. O Amazon Managed Service for Prometheus funciona em conjunto com um servidor do Prometheus que já está em execução no cluster do Amazon EKS. A execução da integração do Amazon Managed Service for Prometheus com o operador do Flink do Amazon EMR implantará e configurará automaticamente um servidor do Prometheus para a integração com o Amazon Managed Service for Prometheus.

1. [Crie um Workspace do Amazon Managed Service for Prometheus](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html). Este Workspace serve como um endpoint de ingestão. Você precisará do URL de gravação remota posteriormente.

1. Configure perfis do IAM para as contas de serviço.

   Para esse método de integração, use perfis do IAM para as contas de serviço no cluster do Amazon EKS em que o servidor do Prometheus está em execução. Esses perfis também são chamados de *perfis de serviço*.

   Se você ainda não tiver os perfis, [configure perfis de serviço para a ingestão de métricas de clusters do Amazon EKS.](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html)

   Antes de continuar, crie um perfil do IAM chamado `amp-iamproxy-ingest-role`.

1. Instale o operador do Flink do Amazon EMR com o Amazon Managed Service for Prometheus.

Agora que você tem um Workspace do Amazon Managed Service for Prometheus, um perfil do IAM dedicado para o Amazon Managed Service for Prometheus e as permissões necessárias, é possível instalar o operador do Flink do Amazon EMR.

Criar um arquivo `enable-amp.yaml` Esse arquivo permite usar uma configuração personalizada para substituir as configurações do Amazon Managed Service for Prometheus. Use seus próprios perfis.

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

Use o comando [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) para transferir as substituições para o chart `flink-kubernetes-operator`.

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

Esse comando instala automaticamente um relator do Prometheus no operador na porta 9999. Qualquer `FlinkDeployment` futura também expõe uma porta para `metrics` em 9249.
+ As métricas do operador do Flink aparecem no Prometheus sob o rótulo `flink_k8soperator_`.
+ As métricas Task Manager do Flink aparecem no Prometheus sob o rótulo `flink_taskmanager_`.
+ As métricas Job Manager do Flink aparecem no Prometheus sob o rótulo `flink_jobmanager_`.

# Uso da interface do usuário do Flink para monitorar trabalhos do Flink
<a name="jobruns-flink-monitoring-ui"></a>

Para monitorar a integridade e a performance de uma aplicação do Flink em execução, use o *painel Web do Flink*. Esse painel fornece informações sobre o status do trabalho, o número TaskManagers e as métricas e registros do trabalho. Ele também permite visualizar e modificar a configuração do trabalho do Flink e interagir com o cluster do Flink para enviar ou cancelar trabalhos.

Para acessar o painel Web do Flink para uma aplicação do Flink em execução no Kubernetes:

1. Use o `kubectl port-forward` comando para encaminhar uma porta local para a porta na qual o Flink Web Dashboard está sendo executado nos pods do TaskManager aplicativo Flink. Por padrão, esta porta é 8081. *deployment-name*Substitua pelo nome da implantação do aplicativo Flink acima.

   ```
   kubectl get deployments -n namespace
   ```

   Resultado do exemplo:

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. Se você quiser usar uma porta diferente localmente, use o parâmetro:8081*local-port*.

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. Em um navegador da Web, vá até `http://localhost:8081` (ou `http://localhost:local-port`, se você usou uma porta local personalizada) para acessar o painel Web do Flink. Esse painel mostra informações sobre o aplicativo Flink em execução, como o status do trabalho, o número e as métricas e registros do trabalho. TaskManagers  
![\[Exemplo de interface do usuário do painel do Flink\]](http://docs.aws.amazon.com/pt_br/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# Uso da configuração de monitoramento para monitorar o operador do Kubernetes do Flink e os trabalhos do Flink
<a name="jobruns-flink-monitoring-configuration"></a>

A configuração de monitoramento permite que você configure facilmente o arquivamento de registros do seu aplicativo Flink e dos registros do operador no S3 and/or CloudWatch (você pode escolher um ou ambos). Isso adiciona um sidecar FluentD aos JobManager seus pods TaskManager e, posteriormente, encaminha os registros desses componentes para os coletores configurados.

**nota**  
Você deve configurar perfis do IAM para a conta de serviço do seu operador do Flink e seu trabalho do Flink (contas de serviço) para poder usar esse recurso, pois ele requer interação com outros Serviços da AWS. Você deve configurar isso usando IRSA em [Configuração do operador do Kubernetes para Flink para o Amazon EMR no EKS](jobruns-flink-kubernetes-operator-setup.md).

## Logs da aplicação do Flink
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

Você pode definir essa configuração da maneira apresentada a seguir.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

A seguir, são apresentadas as opções de configuração.
+ `s3MonitoringConfiguration`: a chave de configuração para configurar o encaminhamento para o S3.
  + `logUri` (obrigatório): o caminho do bucket do S3 em que você deseja armazenar seus logs.
  + O caminho no S3 depois que os logs forem carregados será semelhante ao apresentado a seguir.
    + Nenhuma alternância de log habilitada:

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + A alternância de log está habilitada. Você pode usar um arquivo rotacionado e um arquivo atual (um que não tenha carimbo de data).

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      O formato a seguir é um número incremental.

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + As permissões do IAM apresentadas a seguir são obrigatórias para usar este encaminhador.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration`— chave de configuração para a qual configurar o encaminhamento. CloudWatch
  + `logGroupName`(obrigatório) — nome do grupo de CloudWatch registros para o qual você deseja enviar registros (cria automaticamente o grupo se ele não existir).
  + `logStreamNamePrefix` (opcional): o nome do fluxo de logs para o qual você deseja enviar os logs. O valor padrão é uma string vazia. O formato é o seguinte:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + As permissões do IAM apresentadas a seguir são obrigatórias para usar este encaminhador.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources` (opcional): a chave de configuração para definir limites de recursos no contêiner sidecar do Fluent Bit iniciado.
  + `memoryLimit` (opcional): o valor padrão é 512 Mi. Ajuste de acordo com suas necessidades.
  + `cpuLimit` (opcional): esta opção não tem um padrão. Ajuste de acordo com suas necessidades.
+ `containerLogRotationConfiguration` (opcional): controla o comportamento de alternância de log do contêiner. Ele é habilitado por padrão.
  + `rotationSize` (obrigatório): especifica o tamanho do arquivo para a alternância de log. O intervalo de valores possíveis é de 2 KB a 2 GB. A parcela numérica da unidade do parâmetro rotationSize é transferida como um número inteiro. Como não há suporte para valores decimais, você pode especificar um tamanho de rotação de 1,5 GB, por exemplo, com o valor 1.500 MB. O padrão é 2 GB.
  + `maxFilesToKeep` (obrigatório): especifica o número máximo de arquivos a serem retidos no contêiner após a alternância ter ocorrido. O valor mínimo é de 1 e o valor máximo é de 50. O padrão é 10.

## Logs do operador do Flink
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

Também podemos habilitar o arquivamento de logs para o operador ao usar as opções apresentadas a seguir no arquivo `values.yaml` da instalação do chart do Helm. Você pode ativar o S3 ou ambos. CloudWatch

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

A seguir, estão apresentadas as opções de configuração disponíveis em `monitoringConfiguration`.
+ `s3MonitoringConfiguration`: defina esta opção para realizar o arquivamento no S3.
+ `logUri` (obrigatório): o caminho do bucket do S3 em que você deseja armazenar seus logs.
+ A seguir, estão apresentados os formatos de como os caminhos do bucket do S3 podem parecer depois que os logs são carregados.
  + Nenhuma alternância de log habilitada.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + A alternância de log está habilitada. Você pode usar um arquivo rotacionado e um arquivo atual (um que não tenha carimbo de data).

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    O índice de formato a seguir é um número incremental.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration`— a chave de configuração para a qual configurar o encaminhamento. CloudWatch
  + `logGroupName`(obrigatório) — nome do grupo de CloudWatch registros para o qual você deseja enviar registros. O grupo é criado automaticamente, se não existir.
  + `logStreamNamePrefix` (opcional): o nome do fluxo de logs para o qual você deseja enviar os logs. O valor padrão é uma string vazia. O formato em CloudWatch é o seguinte:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources` (opcional): a chave de configuração para definir limites de recursos no contêiner sidecar do Fluent Bit iniciado.
  + `memoryLimit` (opcional): o limite de memória. Ajuste de acordo com suas necessidades. O padrão é de 512Mi.
  + `cpuLimit`: o limite de CPU. Ajuste de acordo com suas necessidades. Nenhum valor padrão.
+ `containerLogRotationConfiguration` (opcional): controla o comportamento de alternância de log do contêiner. Ele é habilitado por padrão.
  + `rotationSize` (obrigatório): especifica o tamanho do arquivo para a alternância de log. O intervalo de valores possíveis é de 2 KB a 2 GB. A parcela numérica da unidade do parâmetro rotationSize é transferida como um número inteiro. Como não há suporte para valores decimais, você pode especificar um tamanho de rotação de 1,5 GB, por exemplo, com o valor 1.500 MB. O padrão é 2 GB.
  + `maxFilesToKeep` (obrigatório): especifica o número máximo de arquivos a serem retidos no contêiner após a alternância ter ocorrido. O valor mínimo é de 1 e o valor máximo é de 50. O padrão é 10.

# Como o Flink oferece suporte à alta disponibilidade e resiliência do trabalho
<a name="jobruns-flink-resiliency"></a>

As seções a seguir descrevem como o Flink torna os trabalhos mais confiáveis ​​e altamente disponíveis. Ele faz isso por meio de capacidades integradas, como a alta disponibilidade do Flink e vários recursos de recuperação, caso ocorram falhas.

**Topics**
+ [Uso da alta disponibilidade (HA) para operadores e aplicações do Flink](jobruns-flink-using-ha.md)
+ [Otimização dos tempos de reinicialização de trabalhos do Flink para operações de recuperação e ajuste de escala de tarefas com o Amazon EMR no EKS](jobruns-flink-restart.md)
+ [Desativação tranquila de instâncias spot com Flink no Amazon EMR no EKS](jobruns-flink-decommission.md)

# Uso da alta disponibilidade (HA) para operadores e aplicações do Flink
<a name="jobruns-flink-using-ha"></a>

Este tópico mostra como configurar a alta disponibilidade e descreve como ela funciona em alguns casos de uso diferentes. Isso inclui o uso do gerenciador de trabalhos e do Kubernetes nativo para Flink.

## Alta disponibilidade do operador do Flink
<a name="jobruns-flink-ha-operator"></a>

Habilitamos a *alta disponibilidade* do operador do Flink para que possamos fazer failover para um operador do Flink em espera a fim de minimizar o tempo de inatividade no loop de controle do operador se ocorrerem falhas. A alta disponibilidade é habilitada por padrão e o número padrão de réplicas iniciais para o operador é dois. É possível configurar o campo de réplicas em seu arquivo `values.yaml` para o chart do Helm.

Os seguintes campos são personalizáveis:
+ `replicas` (opcional, o padrão é dois): definir esse número como maior que um cria outros operadores em espera e permite uma recuperação mais rápida do trabalho.
+ `highAvailabilityEnabled` (opcional, o padrão é “true”): controla se você deseja habilitar a HA. Especificar esse parâmetro como “true” habilita o suporte à implantação multi-AZ e define os parâmetros `flink-conf.yaml` corretos.

Você pode desativar a HA para seu operador ao definir a configuração apresentada a seguir em seu arquivo `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Implantação multi-AZ**

Criamos os pods do operador em várias zonas de disponibilidade. Esta é uma restrição leve, e seus pods do operador serão programados na mesma AZ, se você não tiver recursos suficientes em uma AZ diferente.

**Determinação da réplica líder**

 Se o HA estiver habilitado, as réplicas usarão uma concessão para determinar qual delas JMs é a líder e usarão uma locação K8s para a eleição do líder. Você pode descrever a concessão e consultar o campo .Spec.Holder Identity para determinar o líder atual.

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interação entre o Flink e o S3**

**Configuração de credenciais de acesso**

Certifique-se de ter configurado o IRSA com as permissões do IAM apropriadas para acessar o bucket do S3.

**Busca por trabalhos em JARs do modo de aplicação do S3**

O operador do Flink também oferece suporte à busca de aplicações do S3 em JARs. Você acabou de fornecer a localização S3 para o JARuri em sua FlinkDeployment especificação.

Você também pode usar esse recurso para baixar outros artefatos, como PyFlink scripts. O script do Python resultante é descartado no caminho `/opt/flink/usrlib/`.

O exemplo a seguir demonstra como usar esse recurso para um PyFlink trabalho. Observe os campos jarURI e args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Conectores do S3 para Flink**

O Flink vem com dois conectores do S3 (listados abaixo). As seções a seguir debatem sobre o momento de usar cada conector.

**Ponto de verificação: conector do S3 para Presto**
+ Defina o esquema do S3 como s3p://.
+ O conector recomendado a ser usado como ponto de verificação para o s3. Para obter mais informações, consulte [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) na documentação do Apache Flink.

Exemplo de FlinkDeployment especificação:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Leitura e gravação no S3: conector Hadoop S3**
+ Defina o esquema do S3 como `s3://` ou (`s3a://`).
+ O conector recomendado para a leitura e a gravação de arquivos do S3 (somente um conector do S3 que implementa a [interface Flink Filesystem](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ Por padrão, definimos `fs.s3a.aws.credentials.provider` no arquivo `flink-conf.yaml`, que é `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Se você substituir completamente o `flink-conf` padrão e estiver interagindo com o S3, certifique-se de usar este provedor.

Exemplo de FlinkDeployment especificação

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## JobManager do Flink
<a name="jobruns-flink-ha-manager"></a>

A alta disponibilidade (HA) para implantações do Flink permite que os trabalhos continuem progredindo mesmo que um erro transitório seja encontrado e você falhe. JobManager Com a HA habilitada, os trabalhos serão reiniciados, mas a partir do último ponto de verificação com êxito. Sem o HA ativado, o Kubernetes reiniciará o seu JobManager, mas seu trabalho começará como um novo trabalho e perderá o progresso. Depois de configurar o HA, podemos pedir ao Kubernetes que armazene os metadados de HA em um armazenamento persistente para referência no caso de uma falha transitória no JobManager e, em seguida, retome nossos trabalhos a partir do último ponto de verificação bem-sucedido.

A HA é habilitada, por padrão, para os trabalhos do Flink (a contagem de réplicas é definida como dois, o que exigirá que você forneça um local de armazenamento do S3 para que os metadados de HA persistam).

**Configurações de HA**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

A seguir estão as descrições das configurações de HA apresentadas acima no Job Manager (definidas em .spec.jobManager):
+ `highAvailabilityEnabled` (opcional, o padrão é “true”): defina como `false ` se você não desejar que a HA seja habilitada e não quiser usar as configurações de HA fornecidas. Você ainda pode manipular o campo “réplicas” para configurar manualmente a HA.
+ `replicas`(opcional, o padrão é 2): definir esse número como maior que 1 cria outro modo de espera JobManagers e permite uma recuperação mais rápida do seu trabalho. Se você desabilitar a HA, deverá definir a contagem de réplicas como um ou continuará recebendo erros de validação (somente uma réplica tem suporte se a HA não estiver habilitada).
+ `storageDir` (obrigatório): por usar a contagem de réplicas como dois, por padrão, é necessário fornecer um storageDir persistente. No momento, este campo aceita somente caminhos do S3 como local de armazenamento.

**Localidade de pod**

 Se você habilitar o HA, também tentaremos colocar pods na mesma AZ, o que leva a um melhor desempenho (latência de rede reduzida ao ter pods no mesmo). AZs Este é um processo de melhor esforço, ou seja, se você não tiver recursos suficientes na AZ em que a maioria dos seus pods está programada, os pods restantes ainda serão programados, mas poderão acabar em um nó externo a esta AZ.

**Determinação da réplica líder**

Se o HA estiver habilitado, as réplicas usarão uma concessão para determinar qual delas JMs é a líder e usam um K8s Configmap como armazenamento de dados para armazenar esses metadados. Se quiser determinar o líder, você pode consultar o conteúdo do Configmap e a chave `org.apache.flink.k8s.leader.restserver` nos dados para encontrar o pod do K8s com o endereço IP. Você também pode usar os comandos bash apresentados a seguir.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Trabalho do Flink: Kubernetes nativo
<a name="jobruns-flink-ha-kubernetes"></a>

As versões 6.13.0 e superiores do Amazon EMR oferecem suporte ao Kubernetes nativo para Flink para a execução de aplicações do Flink no modo de alta disponibilidade em um cluster do Amazon EKS. 

**nota**  
Você deve ter um bucket do Amazon S3 criado para armazenar os metadados de alta disponibilidade ao enviar o trabalho do Flink. Se não desejar esse atributo, você poderá desativá-lo. Por padrão, ele é habilitado.

Para ativar o recurso de alta disponibilidade do Flink, forneça os parâmetros do Flink descritos a seguir ao [executar o comando da CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Os parâmetros são definidos abaixo do exemplo.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**: o bucket do Amazon S3 onde você deseja armazenar os metadados de alta disponibilidade para o trabalho.

  **`Dkubernetes.jobmanager.replicas`**: o número de pods do gerenciador de trabalhos a serem criados como um número inteiro maior que `1`.

  **`Dkubernetes.cluster-id`**: um ID exclusivo que identifica o cluster do Flink.

# Otimização dos tempos de reinicialização de trabalhos do Flink para operações de recuperação e ajuste de escala de tarefas com o Amazon EMR no EKS
<a name="jobruns-flink-restart"></a>

Quando uma tarefa falha ou ocorre uma operação de ajuste de escala, o Flink tenta reexecutar a tarefa com base no último ponto de verificação concluído. O processo de reinicialização pode levar um minuto ou mais para ser executado, dependendo do tamanho do estado do ponto de verificação e do número de tarefas paralelas. Durante o período de reinicialização, as tarefas de backlog podem se acumular para o trabalho. No entanto, existem algumas maneiras de o Flink otimizar a velocidade de recuperação e reinicialização dos gráficos de execução para melhorar a estabilidade do trabalho.

Esta página descreve algumas maneiras que o Flink no Amazon EMR pode usar para melhorar o tempo de reinício de trabalhos durante as operações de recuperação ou ajuste de escala de tarefas em instâncias spot. As instâncias spot são capacidades computacionais não utilizadas que estão disponíveis com desconto. Elas têm comportamentos únicos, incluindo interrupções ocasionais, por isso é importante entender como o Amazon EMR no EKS lida com elas e como o Amazon EMR no EKS realiza a desativação e a reinicialização do trabalho.

**Topics**
+ [Recuperação local de tarefas](#flink-restart-task-local)
+ [Recuperação local de tarefas por meio da montagem de volume do Amazon EBS](#flink-restart-task-local-ebs)
+ [Ponto de verificação incremental genérico baseado em logs](#flink-restart-log-check)
+ [Recuperação refinada](#flink-restart-fine-grained)
+ [Mecanismo de reinício combinado no agendador adaptável](#flink-restart-combined)

## Recuperação local de tarefas
<a name="flink-restart-task-local"></a>

**nota**  
A recuperação local de tarefas é compatível com o Flink nas versões 6.14.0 e superiores do Amazon EMR no EKS.

Com os pontos de verificação do Flink, cada tarefa produz um snapshot do próprio estado que o Flink grava em um armazenamento distribuído, como o Amazon S3. Em casos de recuperação, as tarefas restauram o estado no armazenamento distribuído. O armazenamento distribuído oferece tolerância a falhas e pode redistribuir o estado durante o reajuste de escala por ser acessível a todos os nós.

No entanto, um armazenamento distribuído remoto também tem uma desvantagem: todas as tarefas devem ler seu estado de um local remoto na rede. Isso pode resultar em tempos de recuperação longos para estados grandes durante operações de recuperação ou ajuste de escala de tarefas.

Esse problema de tempo de recuperação longo é resolvido pela *recuperação local de tarefas*. As tarefas gravam seu estado no ponto de verificação em um armazenamento secundário local para a tarefa, como em um disco local. Elas também armazenam seu estado no armazenamento principal, ou no Amazon S3, no nosso caso. Durante a recuperação, o agendador programa as tarefas no mesmo gerenciador de tarefas em que elas foram executadas anteriormente, para que possam se recuperar do armazenamento de estado local em vez de ler do armazenamento de estado remoto. Para obter mais informações, consulte [Task-Local Recovery](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) na *Documentação do Apache Flink*.

Nossos testes de benchmark com exemplos de trabalhos mostraram que o tempo de recuperação foi reduzido de minutos para alguns segundos com a recuperação local da tarefa ativada.

Para habilitar a recuperação local de tarefas, defina as configurações a seguir no seu arquivo `flink-conf.yaml`. Especifique o valor do intervalo do ponto de verificação em milissegundos.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Recuperação local de tarefas por meio da montagem de volume do Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**nota**  
A recuperação local de tarefas por meio do Amazon EBS é compatível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.

Com o Flink para Amazon EMR no EKS, você pode provisionar automaticamente os volumes do Amazon EBS nos pods do TaskManager para a recuperação local de tarefas. A montagem de sobreposição padrão vem com um volume de 10 GB, o que é suficiente para trabalhos com um estado inferior. Trabalhos com estados grandes podem habilitar a opção de *montagem automática de volume do EBS*. Os pods do TaskManager são criados e montados automaticamente durante a criação do pod e removidos durante a exclusão do pod.

Use as etapas a seguir para habilitar a montagem automática de volume do EBS para o Flink no Amazon EMR no EKS.

1. Exporte os valores das seguintes variáveis que você usará nas próximas etapas:

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Crie ou atualize um arquivo `kubeconfig` YAML para o cluster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Crie uma conta de serviço do IAM para o driver de interface de armazenamento de contêiner (CSI) do Amazon EBS no cluster do Amazon EKS. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Crie o driver CSI do Amazon EBS com o seguinte comando:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Crie a classe de armazenamento do Amazon EBS com o seguinte comando:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Depois, aplique a classe:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. O Helm instala o operador do Kubernetes para Flink do Amazon EMR com opções para criar uma conta de serviço. Isso cria o `emr-containers-sa-flink` para ser usado na implantação do Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Para enviar o trabalho do Flink e habilitar o provisionamento automático de volumes do EBS para a recuperação local de tarefas, defina as configurações a seguir no seu arquivo `flink-conf.yaml`. Ajuste o limite de tamanho para o tamanho do estado do trabalho. Defina `serviceAccount` como `emr-containers-sa-flink`. Especifique o valor do intervalo do ponto de verificação em milissegundos. Omita o `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Quando estiver tudo pronto para excluir o plug-in do driver CSI do Amazon EBS, use os seguintes comandos:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Ponto de verificação incremental genérico baseado em logs
<a name="flink-restart-log-check"></a>

**nota**  
O ponto de verificação incremental genérico baseado em logs é compatível com o Flink para as versões 6.14.0 e superiores do Amazon EMR no EKS.

O recurso de ponto de verificação incremental genérico baseado em logs foi adicionado ao Flink 1.16 para melhorar a velocidade dos pontos de verificação. Um intervalo de ponto de verificação mais rápido costuma resultar em uma redução do trabalho de recuperação porque menos eventos precisam ser reprocessados após a recuperação. Para obter mais informações, consulte [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) no *blog do Apache Flink*.

Com exemplos de trabalhos, nossos testes de comparação mostraram que o tempo do ponto de verificação foi reduzido de minutos para alguns segundos com o ponto de verificação incremental genérico baseado em logs.

Para habilitar os pontos de verificação incrementais genéricos baseados em logs, defina as configurações a seguir no seu arquivo `flink-conf.yaml`. Especifique o valor do intervalo do ponto de verificação em milissegundos.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Recuperação refinada
<a name="flink-restart-fine-grained"></a>

**nota**  
O suporte de recuperação refinada para o agendador padrão é compatível com o Flink nas versões 6.14.0 e superiores do Amazon EMR no EKS. O suporte de recuperação refinada no agendador adaptável está disponível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.

Quando uma tarefa falha durante a execução, o Flink redefine todo o gráfico de execução e aciona a reexecução completa com base no último ponto de verificação concluído. Isso é mais caro do que apenas reexecutar as tarefas que falharam. A recuperação refinada reinicia somente o componente conectado ao pipeline da tarefa que falhou. No exemplo a seguir, o gráfico do trabalho tem cinco vértices (de `A` a `E`). É feito o pipeline de todas as conexões entre os vértices com distribuição pontual, e o `parallelism.default` do trabalho é definido como `2`. 

```
A → B → C → D → E
```

Neste exemplo, há um total de dez tarefas em execução. O primeiro pipeline (de `a1` a `e1`) é executado em um TaskManager (`TM1`) e o segundo pipeline (de `a2` a `e2`) é executado em outro TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Há dois componentes conectados por pipeline: `a1 → e1` e `a2 → e2`. Se `TM1` ou `TM2` falhar, a falha afetará somente as cinco tarefas no pipeline em que TaskManager estava em execução. A estratégia de reinicialização só inicia o componente do pipeline afetado. 

A recuperação refinada funciona somente com trabalhos perfeitamente paralelos do Flink. Não é compatível com operações de `keyBy()` ou `redistribute()`. Para obter mais informações, consulte [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) no projeto Jira *Flink Improvement Proposal*.

Para habilitar a recuperação refinada, defina as configurações a seguir no seu arquivo `flink-conf.yaml`.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Mecanismo de reinício combinado no agendador adaptável
<a name="flink-restart-combined"></a>

**nota**  
O mecanismo de reinicialização combinado no agendador adaptável é compatível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.

O agendador adaptável pode ajustar o paralelismo do trabalho com base nos slots disponíveis. Ele reduz automaticamente o paralelismo se não houver slots suficientes disponíveis para atender ao paralelismo do trabalho configurado. Se novos slots ficarem disponíveis, o trabalho aumentará a escala verticalmente mais uma vez para o paralelismo do trabalho configurado. Um agendador adaptável evita o tempo de inatividade no trabalho quando não há recursos suficientes disponíveis. Esse é o agendador compatível com o escalador automático do Flink. Recomendamos o agendador adaptável com o Flink no Amazon EMR por esses motivos. No entanto, os agendadores adaptáveis podem fazer várias reinicializações em um curto período; uma reinicialização para cada novo recurso adicionado. Isso pode levar a uma queda na performance do trabalho.

Com o Amazon EMR 6.15.0 e versões superiores, o Flink tem um mecanismo de reinício combinado no agendador adaptável que abre uma janela de reinicialização quando o primeiro recurso é adicionado e aguarda pelo intervalo configurado da janela do padrão de um minuto. Ele executa uma única reinicialização quando há recursos suficientes disponíveis para executar o trabalho com o paralelismo configurado ou quando o intervalo expira.

Com os exemplos de trabalhos, nossos testes de comparação mostraram que esse recurso processa 10% a mais dos registros do que o comportamento padrão ao usar o agendador adaptável e o escalador automático do Flink.

Para habilitar o mecanismo de reinício combinado, defina as configurações a seguir no seu arquivo `flink-conf.yaml`.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Desativação tranquila de instâncias spot com Flink no Amazon EMR no EKS
<a name="jobruns-flink-decommission"></a>

O Flink com o Amazon EMR no EKS pode melhorar o tempo de reinicialização de trabalhos durante as operações de recuperação e ajuste de escala de tarefas.

## Visão geral do
<a name="jobruns-flink-decommission-overview"></a>

As versões 6.15.0 e superiores do Amazon EMR no EKS oferecem suporte à desativação tranquila de gerenciadores de tarefas em instâncias spot no Amazon EMR no EKS com o Flink para Apache. Como parte desse recurso, o Amazon EMR no EKS com o Flink fornece as funcionalidades a seguir.
+ Just-in-time ponto **de verificação** — As tarefas de streaming do Flink podem responder à interrupção da Instância Spot, realizar o ponto de verificação just-in-time (JIT) das tarefas em execução e impedir o agendamento de tarefas adicionais nessas Instâncias Spot. O ponto de verificação de JIT é compatível com agendadores padrão e adaptáveis.
+ **Mecanismo de reinicialização combinado**: um mecanismo de reinicialização combinado faz a melhor tentativa de reiniciar o trabalho depois que ele atinge o paralelismo de recursos de destino ou o final da janela configurada atual. Isso também evita reinicializações consecutivas de trabalhos que podem ser causadas por vários encerramentos de instâncias spot. O mecanismo de reinicialização combinado está disponível somente com o agendador adaptável.

Esses recursos fornecem os seguintes benefícios:
+ Você pode aproveitar as instâncias spot para executar gerenciadores de tarefas e reduzir os gastos com clusters.
+ O aprimoramento de liveness do gerenciador de tarefas da instância spot resulta em maior resiliência e agendamento de trabalhos mais eficiente.
+ Seus trabalhos do Flink terão mais tempo de atividade porque haverá menos reinicializações após o encerramento da instância spot.

## Como funciona a desativação normal
<a name="jobruns-flink-decommission-howitworks"></a>

Considere o seguinte exemplo: você provisiona um cluster do Amazon EMR no EKS executando o Flink para Apache e especifica nós sob demanda para o gerenciador de trabalhos e nós de instância spot para o gerenciador de tarefas. Dois minutos antes do encerramento, o gerenciador de tarefas recebe um aviso de interrupção.

Nesse cenário, o gerenciador de trabalhos manipularia o sinal de interrupção da instância spot, bloquearia o agendamento de tarefas adicionais na instância spot e iniciaria o ponto de verificação de JIT do trabalho de streaming.

Em seguida, o gerenciador de trabalhos reiniciaria o gráfico do trabalho somente depois que houvesse disponibilidade suficiente de novos recursos para satisfazer o paralelismo atual de trabalhos na janela atual do intervalo de reinicialização. O intervalo da janela de reinicialização é decidido com base na duração da substituição da instância spot, na criação de novos pods do gerenciador de tarefas e no registro no gerenciador de trabalhos.

## Pré-requisitos
<a name="jobruns-flink-decommission-prereqs"></a>

Para usar a desativação normal, crie e execute um trabalho de streaming em um cluster do Amazon EMR no EKS executando o Apache Flink. Habilite o agendador adaptável e os gerenciadores de tarefas agendados em, pelo menos, uma instância spot, conforme mostrado no exemplo a seguir. Você deve usar nós sob demanda para o gerenciador de trabalhos e pode usar nós sob demanda para os gerenciadores de tarefas, desde que também haja pelo menos uma instância spot.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuração
<a name="jobruns-flink-decommission-config"></a>

Esta seção aborda a maioria das configurações que você pode especificar para as suas necessidades de desativação. 


| Chave | Description | Valor padrão  | Valores aceitos | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Habilita a desativação tranquila do gerenciador de tarefas.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Habilita o mecanismo de reinicialização combinado no agendador adaptável.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  O intervalo combinado da janela de reinicialização para efetuar reinicializações mescladas do trabalho. Um número inteiro sem uma unidade é interpretado como milissegundos.  |  1m  |  Exemplos: 30, 60s, 3m, 1h  | 

# Uso do Autoscaler para aplicações do Flink
<a name="jobruns-flink-autoscaler"></a>

O Autoscaler do operador pode ajudar a aliviar a contrapressão ao coletar métricas de trabalhos do Flink e ajustar automaticamente o paralelismo em nível de vértice do trabalho. Confira o seguinte exemplo de como a configuração pode se parecer:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Essa configuração usa valores padrão para a versão mais recente do Amazon EMR. Se você usar outras versões, é possível que tenha valores diferentes.

**nota**  
A partir do Amazon EMR 7.2.0, você não precisa incluir o prefixo `kubernetes.operator` na sua configuração. Se você usa a versão 7.1.0 ou inferior, deve usar o prefixo antes de cada configuração. Por exemplo, você deve especificar `kubernetes.operator.job.autoscaler.scaling.enabled`.

A seguir, são apresentadas as opções de configuração para o Autoscaler.
+ `job.autoscaler.scaling.enabled`: especifica se a execução do ajuste de escala de vértices deve ser habilitada pelo escalador automático. O padrão é `true`. Se você desabilitar essa configuração, o escalador automático coletará apenas métricas e avaliará o paralelismo sugerido para cada vértice, mas não atualizará os trabalhos.
+ `job.autoscaler.stabilization.interval`: o período de estabilização no qual nenhuma nova escalabilidade será executada. O padrão é de cinco minutos.
+ `job.autoscaler.metrics.window`: o tamanho da janela de agregação de métricas de escalabilidade. Quanto mais ampla for a janela, mais harmoniosa e estável ela será, mas o Autoscaler pode demorar mais para reagir a alterações repentinas de carga. O padrão é de 15 minutos. Recomendamos que você experimente usando um valor entre 3 e 60 minutos.
+ `job.autoscaler.target.utilization`: a utilização desejada para o vértice para fornecer uma performance de trabalho estável e algum buffer para flutuações de carga. O padrão é `0.7` atingir 70% utilization/load para os vértices do trabalho.
+ `job.autoscaler.target.utilization.boundary`: o limite da utilização desejada para o vértice, que serve como buffer extra para evitar uma escalabilidade imediata em flutuações de carga. O padrão é `0.3`, o que significa que é permitido um desvio de 30% da utilização desejada antes do acionamento de uma ação de ajuste de escala.
+ `ob.autoscaler.restart.time`: o tempo esperado para reiniciar a aplicação. O padrão é de cinco minutos.
+ `job.autoscaler.catch-up.duration`: o tempo esperado para a recuperação, ou seja, o processamento total de qualquer backlog após a conclusão de uma operação de escalabilidade. O padrão é de cinco minutos. Ao reduzir a duração da recuperação, é necessário que o Autoscaler reserve uma capacidade extra para as ações de escalabilidade.
+ `pipeline.max-parallelism`: o paralelismo máximo que o Autoscaler pode usar. O Autoscaler ignora esse limite se ele for maior que o paralelismo máximo configurado na configuração do Flink ou diretamente em cada operador. O padrão é -1. Observe que o Autoscaler calcula o paralelismo como um divisor do número de paralelismo máximo, portanto, é recomendado escolher configurações de paralelismo máximo com muitos divisores em vez de confiar nos padrões fornecidos pelo Flink. Recomendamos usar múltiplos de 60 para esta configuração, por exemplo, 120, 180, 240, 360, 720 etc.

Para obter uma página de referência de configurações mais detalhada, consulte [Autoscaler Configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Ajuste automático dos parâmetros do escalador automático
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

Esta seção descreve o comportamento de ajuste automático para várias versões do Amazon EMR. Ela também detalha as diferentes configurações de ajuste de escala automático.

**nota**  
O Amazon EMR 7.2.0 e versões posteriores usam a configuração de código aberto `job.autoscaler.restart.time-tracking.enabled` para habilitar a **estimativa do tempo de redimensionamento**. A estimativa do tempo de redimensionamento tem a mesma funcionalidade do ajuste automático do Amazon EMR, então você não precisa atribuir manualmente valores empíricos ao horário de reinicialização.  
Você ainda pode usar o ajuste automático do Amazon EMR se estiver utilizando o Amazon EMR 7.1.0 ou inferior.

------
#### [ 7.2.0 and higher ]

O Amazon EMR 7.2.0 e versões posteriores medem o tempo real de reinicialização necessário para aplicar decisões de ajuste de escala automático. Nas versões 7.1.0 e inferiores, você precisava usar a configuração `job.autoscaler.restart.time` para configurar manualmente o tempo máximo estimado de reinicialização. Ao usar a configuração `job.autoscaler.restart.time-tracking.enabled`, você só precisa inserir um horário de reinicialização para o primeiro ajuste de escala. Depois, o operador registra o tempo real de reinicialização e o usará para ajustes de escala subsequentes.

Para habilitar esse rastreamento, use o seguinte comando:

```
job.autoscaler.restart.time-tracking.enabled: true
```

A seguir estão as configurações relacionadas para a estimativa do tempo de redimensionamento.


| Configuração | Obrigatório | Padrão | Description | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | Não | Falso | Indica se o escalador automático do Flink deve ajustar automaticamente as configurações ao longo do tempo para otimizar as decisões de escalabilidade. Observe que o escalador automático só pode ajustar automaticamente o parâmetro restart.time do escalador automático. | 
| job.autoscaler.restart.time | Não | 5 minutos | O tempo de reinicialização esperado que o Amazon EMR no EKS usa até que o operador possa determinar o tempo real de reinicialização com base nos ajustes de escala anteriores. | 
| job.autoscaler.restart.time-tracking.limit | Não | 15 minutos | O tempo máximo de reinicialização observado quando job.autoscaler.restart.time-tracking.enabled está definido como true. | 

Confira este exemplo de especificação de implantação que você pode usar para testar a estimativa do tempo de redimensionamento:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Para simular a contrapressão, use a especificação de implantação a seguir.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Faça upload do script Python a seguir no bucket do S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Para verificar se a estimativa do tempo de redimensionamento está funcionando, confira se o registro em log em nível de `DEBUG` do operador do Flink está habilitado. O exemplo abaixo demonstra como atualizar o arquivo `values.yaml` do chart do Helm. Em seguida, reinstale o chart do Helm atualizado e execute o trabalho do Flink novamente.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Obtenha o nome do pod líder.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Execute o comando a seguir para obter o tempo real de reinicialização usado nas avaliações de métricas.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Você deve ver logs semelhantes aos mostrados a seguir. Observe que somente o primeiro ajuste de escala usa ` job.autoscaler.restart.time`. Os ajustes de escala subsequentes usam o tempo de reinicialização observado.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

O escalador automático integrado do Flink e de código aberto usa várias métricas para tomar as melhores decisões de ajuste de escala. No entanto, os valores padrão que ele usa para seus cálculos devem ser aplicáveis à maioria das workloads e podem não ser ideais para um determinado trabalho. O recurso de ajuste automático adicionado à versão do Amazon EMR no EKS do operador do Flink analisa as tendências históricas observadas em métricas específicas capturadas e, em seguida, tenta calcular o valor mais adequado para o trabalho em questão.


| Configuração | Obrigatório | Padrão | Description | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | Não | Falso | Indica se o escalador automático do Flink deve ajustar automaticamente as configurações ao longo do tempo para otimizar as decisões de escalabilidade do escalador automático. Atualmente, o escalador automático só pode ajustar automaticamente seu parâmetro restart.time. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | Não | 3 | Indica quantas métricas históricas do Amazon EMR no EKS o escalador automático mantém no mapa de configuração de métricas do Amazon EMR no EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | Não | 3 | Indica quantas reinicializações o escalador automático executa antes de começar a calcular o tempo médio de reinicialização de um determinado trabalho. | 

Para habilitar o ajuste automático, você deve concluir as seguintes etapas:
+ Defina `kubernetes.operator.job.autoscaler.autotune.enable:` como `true`
+ Defina `metrics.job.status.enable:` como `TOTAL_TIME`
+ Seguir a configuração em [Using Autoscaler for Flink applications](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) para habilitar o ajuste de escala automático.

Confira a seguir um exemplo de especificação de implantação que você pode usar para experimentar o ajuste automático.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Para simular a contrapressão, use a especificação de implantação a seguir.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Faça upload do script Python a seguir no bucket do S3.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Para verificar se o escalador automático está funcionando, use os comandos a seguir. Observe que você deve usar as informações do seu próprio pod líder no operador do Flink.

Primeiro, obtenha o nome do seu pod líder.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Depois de ter o nome do pod líder, será possível executar o comando a seguir.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Você deve ver logs semelhantes aos mostrados a seguir.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Manutenção e solução de problemas para trabalhos do Flink no Amazon EMR no EKS
<a name="jobruns-flink-troubleshooting"></a>

As seções a seguir descrevem como manter seus trabalhos de execução prolongada do Flink e fornecem orientações sobre como solucionar alguns problemas comuns em trabalhos do Flink.

# Manutenção de aplicações do Flink
<a name="jobruns-flink-maintain"></a>

**Topics**
+ [Modos de atualização](#jobruns-flink-upgrademode)

As aplicações do Flink costumam ser projetadas para execuções por longos períodos, como semanas, meses ou até anos. Como acontece com todos os serviços de execução prolongada, as aplicações de streaming do Flink precisam de manutenção. Isso inclui correções de erros, melhorias e migração para um cluster do Flink de uma versão mais recente.

Quando o arquivo spec muda para os recursos `FlinkDeployment` e `FlinkSessionJob`, você precisa atualizar a aplicação em execução. Para isso, o operador interrompe o trabalho em execução (a menos que já esteja suspenso) e o reimplanta com o arquivo spec mais recente e, em aplicações com estado, o estado da execução anterior.

Os usuários controlam como gerenciar o estado quando aplicações com estado param e são restauradas com a configuração `upgradeMode` do `JobSpec`.

## Modos de atualização
<a name="jobruns-flink-upgrademode"></a>

Introdução opcional

**Sem estado**  
Atualizações de aplicações sem estado começando do estado vazio.

**Último estado**  
Atualizações rápidas em qualquer estado da aplicação (mesmo em trabalhos com falha) não exigem um trabalho íntegro, pois sempre usam o ponto de verificação bem-sucedido mais recente. A recuperação manual pode ser necessária se os metadados de HA forem perdidos. Para limitar o tempo em que o trabalho pode efetuar fallback ao escolher o ponto de verificação mais recente, você pode configurar `kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`. Se o ponto de verificação for mais antigo que o valor configurado, um ponto de salvamento será usado para trabalhos íntegros. Isso não é compatível com o modo Sessão. 

**Ponto de salvamento**  
Use o ponto de salvamento para atualização, fornecendo máxima segurança e possibilidade de servir como ponto. backup/fork O ponto de salvamento será criado durante o processo de atualização. Observe que o trabalho do Flink precisa estar em execução para permitir que o ponto de salvamento seja criado. Se o trabalho não estiver íntegro, o último ponto de verificação será usado (a menos que kubernetes.operator.job.upgrade). last-state-fallback.enabled está definido como false). Se o ponto de verificação mais recente não estiver disponível, a atualização do trabalho falhará.

# Solução de problemas
<a name="jobruns-flink-troubleshoot"></a>

Esta seção descreve como solucionar problemas com o Amazon EMR no EKS. Para obter informações sobre como solucionar problemas gerais com o Amazon EMR, consulte [Troubleshoot a cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html) no *Guia de gerenciamento do Amazon EMR*.
+ [Solução de problemas de trabalhos que usam PersistentVolumeClaims (PVC)](permissions-for-pvc.md)
+ [Solução de problemas de escalabilidade automática vertical do Amazon EMR no EKS](troubleshooting-vas.md)
+ [Solução de problemas do operador do Spark do Amazon EMR no EKS](troubleshooting-sparkop.md)

## Solução de problemas do Apache Flink para Amazon EMR no EKS
<a name="jobruns-flink-troubleshooting-apache-flink"></a>

### Mapeamento de recursos não encontrado ao instalar o chart do Helm
<a name="w2aac21c21b7b7b3"></a>

Você pode encontrar a mensagem de erro apresentada a seguir ao instalar o chart do Helm.

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

Para resolver esse erro, instale o cert-manager para habilitar a adição do componente webhook. Você deve instalar o cert-manager em cada cluster do Amazon EKS que usa.

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### AWS service (Serviço da AWS) erro de acesso negado
<a name="jobruns-flink-troubleshooting-access-denied"></a>

Se você ver um erro*access denied* , confirme se o perfil do IAM para `operatorExecutionRoleArn` no arquivo `values.yaml` do chart do Helm tem as permissões corretas. Além disso, certifique-se de que o perfil do IAM em `executionRoleArn` na sua especificação de `FlinkDeployment` tenha as permissões corretas.

### `FlinkDeployment` está preso
<a name="jobruns-flink-troubleshooting-stuck"></a>

Se seu `FlinkDeployment` ficar em um estado preso, use as etapas a seguir para forçar a exclusão da implantação:

1. Edite a execução da implantação.

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. Remova este finalizador.

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. Exclua a implantação.

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### AWSBadRequestException problema s3a ao executar um aplicativo Flink em um opt-in Região da AWS
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Se você executar um aplicativo Flink em um [opt-in Região da AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), poderá ver os seguintes erros:

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

Para corrigir esses erros, use a configuração a seguir no arquivo `FlinkDeployment` de definição.

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

Também recomendamos que você use o provedor de SDKv2 credenciais:

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

Se você quiser usar o provedor de SDKv1 credenciais, certifique-se de que seu SDK seja compatível com sua região de inscrição. Para obter mais informações, consulte o [aws-sdk-java GitHub repositório.](https://github.com/aws/aws-sdk-java)

Se você obtiver o erro `S3 AWSBadRequestException` ao executar instruções SQL do Flink em uma região de ingresso, certifique-se de definir a configuração `fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME` na especificação de configuração do Flink.

### S3A AWSBad RequestException ao executar um trabalho de sessão do Flink em regiões CN
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Nas versões 6.15.0 à 7.2.0 do Amazon EMR, você pode encontrar as mensagens de erro a seguir ao executar um trabalho de sessão do Flink nas regiões da China. Estes incluem China (Pequim) e China (Ningxia):

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

Esse problema é conhecido. A equipe está trabalhando na correção dos operadores do Flink em todas essas versões de lançamento. No entanto, antes de concluirmos o patch, para corrigir esse erro, você precisa baixar o chart do Helm do operador do Flink, descompactá-lo (extrair o arquivo compactado) e fazer alterações na configuração do chart do Helm.

As etapas especificas são as seguintes:

1. Altere, especificamente, os diretórios da sua pasta local do chart do Helm e execute a linha de comando a seguir para extrair o chart do Helm e descompactá-lo (extraí-lo).

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. Vá até a pasta do chart do Helm e encontre o arquivo `templates/flink-operator.yaml`.

1. Localize `flink-operator-config` ConfigMap e adicione a seguinte `fs.s3a.endpoint.region` configuração no`flink-conf.yaml`. Por exemplo:

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. Instale o chart do Helm local e execute o trabalho.

# Versões compatíveis para Amazon EMR no EKS com Apache Flink
<a name="jobruns-flink-security-release-versions"></a>

O Apache Flink está disponível com as versões do Amazon EMR no EKS apresentadas a seguir. Para obter informações sobre todas as versões disponíveis, consulte [Versões do Amazon EMR no EKS](emr-eks-releases.md).


| Rótulo da versão | Java | Flink | Operador do Flink | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 