

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Uso da alta disponibilidade (HA) para operadores e aplicações do Flink
<a name="jobruns-flink-using-ha"></a>

Este tópico mostra como configurar a alta disponibilidade e descreve como ela funciona em alguns casos de uso diferentes. Isso inclui o uso do gerenciador de trabalhos e do Kubernetes nativo para Flink.

## Alta disponibilidade do operador do Flink
<a name="jobruns-flink-ha-operator"></a>

Habilitamos a *alta disponibilidade* do operador do Flink para que possamos fazer failover para um operador do Flink em espera a fim de minimizar o tempo de inatividade no loop de controle do operador se ocorrerem falhas. A alta disponibilidade é habilitada por padrão e o número padrão de réplicas iniciais para o operador é dois. É possível configurar o campo de réplicas em seu arquivo `values.yaml` para o chart do Helm.

Os seguintes campos são personalizáveis:
+ `replicas` (opcional, o padrão é dois): definir esse número como maior que um cria outros operadores em espera e permite uma recuperação mais rápida do trabalho.
+ `highAvailabilityEnabled` (opcional, o padrão é “true”): controla se você deseja habilitar a HA. Especificar esse parâmetro como “true” habilita o suporte à implantação multi-AZ e define os parâmetros `flink-conf.yaml` corretos.

Você pode desativar a HA para seu operador ao definir a configuração apresentada a seguir em seu arquivo `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Implantação multi-AZ**

Criamos os pods do operador em várias zonas de disponibilidade. Esta é uma restrição leve, e seus pods do operador serão programados na mesma AZ, se você não tiver recursos suficientes em uma AZ diferente.

**Determinação da réplica líder**

 Se o HA estiver habilitado, as réplicas usarão uma concessão para determinar qual delas JMs é a líder e usarão uma locação K8s para a eleição do líder. Você pode descrever a concessão e consultar o campo .Spec.Holder Identity para determinar o líder atual.

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interação entre o Flink e o S3**

**Configuração de credenciais de acesso**

Certifique-se de ter configurado o IRSA com as permissões do IAM apropriadas para acessar o bucket do S3.

**Busca por trabalhos em JARs do modo de aplicação do S3**

O operador do Flink também oferece suporte à busca de aplicações do S3 em JARs. Você acabou de fornecer a localização S3 para o JARuri em sua FlinkDeployment especificação.

Você também pode usar esse recurso para baixar outros artefatos, como PyFlink scripts. O script do Python resultante é descartado no caminho `/opt/flink/usrlib/`.

O exemplo a seguir demonstra como usar esse recurso para um PyFlink trabalho. Observe os campos jarURI e args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Conectores do S3 para Flink**

O Flink vem com dois conectores do S3 (listados abaixo). As seções a seguir debatem sobre o momento de usar cada conector.

**Ponto de verificação: conector do S3 para Presto**
+ Defina o esquema do S3 como s3p://.
+ O conector recomendado a ser usado como ponto de verificação para o s3. Para obter mais informações, consulte [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) na documentação do Apache Flink.

Exemplo de FlinkDeployment especificação:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Leitura e gravação no S3: conector Hadoop S3**
+ Defina o esquema do S3 como `s3://` ou (`s3a://`).
+ O conector recomendado para a leitura e a gravação de arquivos do S3 (somente um conector do S3 que implementa a [interface Flink Filesystem](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ Por padrão, definimos `fs.s3a.aws.credentials.provider` no arquivo `flink-conf.yaml`, que é `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Se você substituir completamente o `flink-conf` padrão e estiver interagindo com o S3, certifique-se de usar este provedor.

Exemplo de FlinkDeployment especificação

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## JobManager do Flink
<a name="jobruns-flink-ha-manager"></a>

A alta disponibilidade (HA) para implantações do Flink permite que os trabalhos continuem progredindo mesmo que um erro transitório seja encontrado e você falhe. JobManager Com a HA habilitada, os trabalhos serão reiniciados, mas a partir do último ponto de verificação com êxito. Sem o HA ativado, o Kubernetes reiniciará o seu JobManager, mas seu trabalho começará como um novo trabalho e perderá o progresso. Depois de configurar o HA, podemos pedir ao Kubernetes que armazene os metadados de HA em um armazenamento persistente para referência no caso de uma falha transitória no JobManager e, em seguida, retome nossos trabalhos a partir do último ponto de verificação bem-sucedido.

A HA é habilitada, por padrão, para os trabalhos do Flink (a contagem de réplicas é definida como dois, o que exigirá que você forneça um local de armazenamento do S3 para que os metadados de HA persistam).

**Configurações de HA**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

A seguir estão as descrições das configurações de HA apresentadas acima no Job Manager (definidas em .spec.jobManager):
+ `highAvailabilityEnabled` (opcional, o padrão é “true”): defina como `false ` se você não desejar que a HA seja habilitada e não quiser usar as configurações de HA fornecidas. Você ainda pode manipular o campo “réplicas” para configurar manualmente a HA.
+ `replicas`(opcional, o padrão é 2): definir esse número como maior que 1 cria outro modo de espera JobManagers e permite uma recuperação mais rápida do seu trabalho. Se você desabilitar a HA, deverá definir a contagem de réplicas como um ou continuará recebendo erros de validação (somente uma réplica tem suporte se a HA não estiver habilitada).
+ `storageDir` (obrigatório): por usar a contagem de réplicas como dois, por padrão, é necessário fornecer um storageDir persistente. No momento, este campo aceita somente caminhos do S3 como local de armazenamento.

**Localidade de pod**

 Se você habilitar o HA, também tentaremos colocar pods na mesma AZ, o que leva a um melhor desempenho (latência de rede reduzida ao ter pods no mesmo). AZs Este é um processo de melhor esforço, ou seja, se você não tiver recursos suficientes na AZ em que a maioria dos seus pods está programada, os pods restantes ainda serão programados, mas poderão acabar em um nó externo a esta AZ.

**Determinação da réplica líder**

Se o HA estiver habilitado, as réplicas usarão uma concessão para determinar qual delas JMs é a líder e usam um K8s Configmap como armazenamento de dados para armazenar esses metadados. Se quiser determinar o líder, você pode consultar o conteúdo do Configmap e a chave `org.apache.flink.k8s.leader.restserver` nos dados para encontrar o pod do K8s com o endereço IP. Você também pode usar os comandos bash apresentados a seguir.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Trabalho do Flink: Kubernetes nativo
<a name="jobruns-flink-ha-kubernetes"></a>

As versões 6.13.0 e superiores do Amazon EMR oferecem suporte ao Kubernetes nativo para Flink para a execução de aplicações do Flink no modo de alta disponibilidade em um cluster do Amazon EKS. 

**nota**  
Você deve ter um bucket do Amazon S3 criado para armazenar os metadados de alta disponibilidade ao enviar o trabalho do Flink. Se não desejar esse atributo, você poderá desativá-lo. Por padrão, ele é habilitado.

Para ativar o recurso de alta disponibilidade do Flink, forneça os parâmetros do Flink descritos a seguir ao [executar o comando da CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Os parâmetros são definidos abaixo do exemplo.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**: o bucket do Amazon S3 onde você deseja armazenar os metadados de alta disponibilidade para o trabalho.

  **`Dkubernetes.jobmanager.replicas`**: o número de pods do gerenciador de trabalhos a serem criados como um número inteiro maior que `1`.

  **`Dkubernetes.cluster-id`**: um ID exclusivo que identifica o cluster do Flink.