

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Cómo apoya Flink la alta disponibilidad y la resiliencia al trabajo
<a name="jobruns-flink-resiliency"></a>

En las siguientes secciones, se explica cómo Flink hace que los trabajos sean más fiables y tengan una alta disponibilidad. Lo hace a través de capacidades integradas, como la alta disponibilidad de Flink y varias capacidades de recuperación en caso de que se produzcan fallos.

**Topics**
+ [Uso de la alta disponibilidad (HA) para Flink Operators y Flink Applications](jobruns-flink-using-ha.md)
+ [Optimización de los tiempos de reinicio de las tareas de Flink para las operaciones de escalado y de recuperación de tareas con Amazon EMR en EKS](jobruns-flink-restart.md)
+ [Desactivación rápida de las instancias de spot con Flink en Amazon EMR en EKS](jobruns-flink-decommission.md)

# Uso de la alta disponibilidad (HA) para Flink Operators y Flink Applications
<a name="jobruns-flink-using-ha"></a>

En este tema se muestra cómo configurar la alta disponibilidad y se describe cómo funciona en algunos casos de uso diferentes. Aquí se incluye cuando usa el administrador de trabajos y cuando usa Kubernetes nativo de Flink.

## Alta disponibilidad del Flink Operator
<a name="jobruns-flink-ha-operator"></a>

Habilitamos la *alta disponiblidad* para el Flink Operator para poder realizar un cambio a un Flink Operator en espera y minimizar el tiempo de inactividad en el bucle de control del operador en caso de fallos. La alta disponibilidad está habilitada de forma predeterminada y el número predeterminado de réplicas de operadores de inicio es 2. Puede configurar el campo de réplicas en su archivo `values.yaml` para el gráfico de Helm.

Los siguientes campos se pueden personalizar:
+ `replicas` (opcional, el valor predeterminado es 2): si se establece este número en uno mayor que 1, se crean otros operadores en espera y se puede recuperar el trabajo con mayor rapidez.
+ `highAvailabilityEnabled` (opcional, el valor predeterminado es verdadero): controla si desea habilitar la alta disponibilidad. Si se especifica este parámetro como verdadero, se habilita la compatibilidad con la implementación multi-AZ y se establecen los parámetros `flink-conf.yaml` correctos.

Puede deshabilitar la alta disponibilidad para su operador al establecer la siguiente configuración en su archivo `values.yaml`.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Implementación Multi-AZ**

Creamos los pods de los operadores en varias zonas de disponibilidad. Se trata de una limitación leve y, si no dispone de recursos suficientes en una zona de disponibilidad diferente, los pods de los operadores se programarán en la misma zona de disponibilidad.

**Determinar la réplica líder**

 Si HA está habilitada, las réplicas utilizan un arrendamiento para determinar cuál de ellos JMs es el líder y utilizan un arrendamiento de K8s para la elección del líder. Puede describir el arrendamiento y consultar el campo .Spec.Holder Identity para determinar el líder actual

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interacción entre Flink y S3**

**Configuración de las credenciales de acceso**

Asegúrese de haber configurado las IRSA con los permisos de IAM adecuados para acceder al bucket de S3.

**Búsqueda de archivos jar de trabajo desde el modo aplicación de S3**

El operador de Flink también admite la búsqueda de archivos jar de aplicaciones desde S3. Solo tiene que proporcionar la ubicación S3 del JaRuri en su especificación. FlinkDeployment 

También puedes usar esta función para descargar otros artefactos, como PyFlink scripts. El script de Python resultante se coloca en la ruta `/opt/flink/usrlib/`.

El siguiente ejemplo muestra cómo utilizar esta función para un PyFlink trabajo. Tenga en cuenta los campos jarURI y args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Conectores de Flink de S3**

Flink viene empaquetado con dos conectores de S3 (enumerados a continuación). En las siguientes secciones se explica cuándo usar cada conector.

**Punto de control: conector de Presto de S3**
+ Establezca el esquema de S3 en s3p://
+ El conector recomendado para establecer el punto de control en s3. Para obtener más información, consulte [Específico de S3](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) en la documentación de Apache Flink.

Ejemplo de FlinkDeployment especificación:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Lectura y escritura en S3: conector Hadoop S3**
+ Establezca el esquema de S3 en `s3://` o en ( `s3a://` )
+ El conector recomendado para leer y escribir archivos desde S3 (solo el conector de S3 que implementa la interfaz del [sistema de archivos de Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/)).
+ De forma predeterminada, establecemos `fs.s3a.aws.credentials.provider` en el archivo `flink-conf.yaml`, que es `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Si anula completamente el valor predeterminado `flink-conf` y está interactuando con S3, asegúrese de usar este proveedor.

Ejemplo de FlinkDeployment especificación

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink JobManager
<a name="jobruns-flink-ha-manager"></a>

La alta disponibilidad (HA) de las implementaciones de Flink permite que los trabajos sigan progresando incluso si se produce un error transitorio y el usuario se bloquea. JobManager Los trabajos se reiniciarán, pero desde el último punto de control exitoso con la alta disponibilidad activada. Si la HA no está habilitada, Kubernetes lo reiniciará JobManager, pero su trabajo empezará como uno nuevo y perderá el progreso. Tras configurar HA, podemos indicarle a Kubernetes que almacene los metadatos de alta disponibilidad en un almacenamiento persistente como referencia en caso de que se produzca un fallo transitorio JobManager y que, después, reanude nuestras tareas desde el último punto de control exitoso.

La alta disponibilidad está habilitada de forma predeterminada para sus trabajos de Flink (el recuento de réplicas está establecido en 2, por lo que tendrá que proporcionar una ubicación de almacenamiento en S3 para que los metadatos de alta disponibilidad se conserven).

**Configuraciones de alta disponibilidad**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Las siguientes son descripciones de las configuraciones de alta disponibilidad anteriores en Job Manager (definidas en .spec.JobManager):
+ `highAvailabilityEnabled` (opcional, el valor predeterminado es verdadero): configúrela en `false ` si no quiere activar la alta disponibilidad y no quiere utilizar las configuraciones de alta disponibilidad proporcionadas. Aún puede manipular el campo “réplicas” para configurar de forma manual la alta disponibilidad.
+ `replicas`(opcional, el valor predeterminado es 2): si se establece este número en mayor que 1, se crea otro modo de espera JobManagers y permite una recuperación más rápida del trabajo. Si deshabilita la alta disponibilidad, debe establecer el recuento de réplicas en 1 o seguirá recibiendo errores de validación (solo se admite 1 réplica si la alta disponibilidad no está habilitada).
+ `storageDir` (obligatorio): dado que utilizamos el recuento de réplicas como 2 de forma predeterminada, debemos proporcionar un storageDir persistente. Actualmente, este campo solo acepta rutas de S3 como ubicación de almacenamiento.

**Localidad del pod**

 Si habilitas la alta disponibilidad, también intentamos colocar los pods en la misma zona de disponibilidad, lo que mejora el rendimiento (se reduce la latencia de la red al tener los pods en la misma zona AZs). Se trata de un proceso de mejor esfuerzo, ya que, si no dispone de recursos suficientes en la zona de disponibilidad en la que están programados la mayoría de sus pods, los demás pods seguirán estando programados, pero es posible que acaben en un nodo fuera de esta zona de disponibilidad.

**Determinar la réplica líder**

Si la alta disponibilidad está habilitada, las réplicas utilizan un arrendamiento para determinar cuál de ellos JMs es el líder y utilizan un mapa de configuración del K8s como almacén de datos para almacenar estos metadatos. Si quiere determinar el líder, consulte el contenido del Configmap y busque en los datos la clave `org.apache.flink.k8s.leader.restserver` para encontrar los pod de K8 con la dirección IP. También puede utilizar los siguientes comandos bash.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Trabajo de Flink: Kubernetes nativo
<a name="jobruns-flink-ha-kubernetes"></a>

A partir de la versión 6.13.0, Amazon EMR es compatible con Kubernetes nativo de Flink para ejecutar aplicaciones de Flink en un clúster de Amazon EKS. 

**nota**  
Debe tener un bucket de Amazon S3 creado para almacenar los metadatos de alta disponibilidad cuando envíe su trabajo de Flink. Si no desea usar esta característica, puede desactivarla. Está habilitada de forma predeterminada.

Para activar la característica de alta disponibilidad de Flink, utilice los siguientes parámetros de Flink al [ejecutar el comando de la CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Los parámetros se definen debajo del ejemplo.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**: el bucket de Amazon S3 en el que desea almacenar los metadatos de alta disponibilidad para su trabajo.

  **`Dkubernetes.jobmanager.replicas`**: el número de pods de Job Manager que se van a crear como un entero superior a `1`.

  **`Dkubernetes.cluster-id`**: un identificador único que identifica el clúster de Flink.

# Optimización de los tiempos de reinicio de las tareas de Flink para las operaciones de escalado y de recuperación de tareas con Amazon EMR en EKS
<a name="jobruns-flink-restart"></a>

Cuando se produce un error en una tarea o se produce una operación de escalado, Flink intenta volver a ejecutar la tarea desde el último punto de control completado. El proceso de reinicio puede tardar un minuto o más en ejecutarse, según el tamaño del estado del punto de control y el número de tareas paralelas. Durante el periodo de reinicio, es posible que se acumulen tareas pendientes para el trabajo. Sin embargo, hay algunas formas en las que Flink optimiza la velocidad de los gráficos de recuperación y reinicio de la ejecución para mejorar la estabilidad del trabajo.

En esta página, se describen algunas de las formas en que Amazon EMR Flink puede mejorar el tiempo de reinicio de un trabajo durante las operaciones de recuperación o escalado en instancias Spot. Las instancias Spot representan capacidad informática no utilizada que está disponible con descuento. Tiene comportamientos únicos, incluidas las interrupciones ocasionales, por lo que es importante entender cómo Amazon EMR en EKS los gestiona, incluida la forma en que Amazon EMR en EKS lleva a cabo el desmantelamiento y el reinicio de los trabajos.

**Topics**
+ [Recuperación local de tareas](#flink-restart-task-local)
+ [Recuperación local de tareas mediante el montaje de volúmenes de Amazon EBS](#flink-restart-task-local-ebs)
+ [Punto de control incremental genérico basado en registros](#flink-restart-log-check)
+ [Recuperación detallada](#flink-restart-fine-grained)
+ [Mecanismo de reinicio combinado en el programador adaptativo](#flink-restart-combined)

## Recuperación local de tareas
<a name="flink-restart-task-local"></a>

**nota**  
La recuperación local de tareas es compatible a partir de la versión 6.14.0 de Flink en Amazon EMR en EKS.

Con los puntos de control de Flink, cada tarea produce una instantánea de su estado que Flink graba en un almacenamiento distribuido como Amazon S3. En los casos de recuperación, las tareas restauran su estado desde el almacenamiento distribuido. El almacenamiento distribuido ofrece tolerancia a errores y puede redistribuir el estado durante el reescalado, ya que todos los nodos pueden acceder a él.

Sin embargo, un almacén distribuido remoto también tiene una desventaja: todas las tareas deben leer su estado desde una ubicación remota a través de la red. Esto puede provocar tiempos de recuperación prolongados para los estados de gran tamaño durante las operaciones de recuperación o escalado de tareas.

Este problema del tiempo de recuperación prolongado se resuelve mediante la *recuperación local de tareas*. Las tareas escriben su estado en el punto de control de un almacenamiento secundario que es local para la tarea, por ejemplo, en un disco local. También almacenan su estado en el almacenamiento principal, o Amazon S3 en nuestro caso. Durante la recuperación, el programador programa las tareas en el mismo administrador de tareas en el que se ejecutaron anteriormente, de modo que se puedan recuperar del almacén de estado local en lugar de leerlas del almacén de estado remoto. Para obtener más información, consulte [Task-Local Recovery](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) en la *documentación de Apache Flink*.

Nuestras pruebas comparativas con trabajos de muestra han demostrado que el tiempo de recuperación se ha reducido de unos minutos a unos pocos segundos con la opción de recuperación local de tareas habilitada.

Para habilitar la recuperación local de tareas, defina las siguientes configuraciones en el archivo `flink-conf.yaml`. Especifique el valor del intervalo de puntos de control en milisegundos.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Recuperación local de tareas mediante el montaje de volúmenes de Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**nota**  
La recuperación local de tareas mediante Amazon EBS es compatible a partir de la versión 6.15.0 de Flink en Amazon EMR en EKS.

Con Flink en Amazon EMR en EKS, puede aprovisionar automáticamente volúmenes de Amazon EBS en los pods de TaskManager para la recuperación local de tareas. El soporte superpuesto predeterminado viene con un volumen de 10 GB, suficiente para trabajos con un estado inferior. Los trabajos con estados grandes pueden habilitar la opción de *montaje de volumen automático con EBS*. Los pods TaskManager se crean y montan automáticamente durante la creación del pod y se eliminan al eliminarlo.

Realice los siguientes pasos a fin de habilitar el montaje automático del volumen de EBS para Flink en Amazon EMR en EKS:

1. Exporte los valores de las siguientes variables que utilizará en los próximos pasos.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Cree o actualice un archivo de `kubeconfig` de YAML para el clúster.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Cree una cuenta de servicio de IAM para el controlador de Amazon EBS Container Storage Interface (CSI) en su clúster de Amazon EKS. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Cree un controlador del Amazon EBS CSI con el siguiente comando:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Cree la clase de almacenamiento de Amazon EBS con el siguiente comando:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Y, a continuación, aplique la clase:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm instaló el operador Amazon EMR Flink Kubernetes con opciones para crear una cuenta de servicio. Esto crea el `emr-containers-sa-flink` que se usará en la implementación de Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Para enviar el trabajo de Flink y habilitar el aprovisionamiento automático de volúmenes de EBS para la recuperación local de tareas, defina las siguientes configuraciones en el archivo `flink-conf.yaml`. Ajuste el límite de tamaño para el tamaño del estado del trabajo. Establece `serviceAccount` en `emr-containers-sa-flink`. Especifique el valor del intervalo de puntos de control en milisegundos. Y omita el `executionRoleArn`.

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Cuando esté listo para eliminar el complemento del controlador CSI de Amazon EBS, utilice los siguientes comandos:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Punto de control incremental genérico basado en registros
<a name="flink-restart-log-check"></a>

**nota**  
Los puntos de control incrementales genéricos basados en registros son compatibles a partir de la versión 6.14.0 de Flink en Amazon EMR en EKS.

En Flink 1.16, se agregaron puntos de control incrementales genéricos basados en registros para mejorar la velocidad de los puntos de control. Un intervalo de puntos de control más rápido suele reducir el trabajo de recuperación, ya que es necesario volver a procesar menos eventos después de la recuperación. Para obtener más información, consulte [Improving speed and stability of checkpointing with generic log-based incremental checkpoints](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/) en el *blog de Apache Flink*.

Con trabajos de muestra, nuestras pruebas comparativas han demostrado que el tiempo de los puntos de control se ha reducido de unos minutos a unos pocos segundos con el punto de control incremental genérico basado en registros.

Para habilitar los puntos de control incrementales genéricos basados en registros, defina las siguientes configuraciones en el archivo `flink-conf.yaml`. Especifique el valor del intervalo de puntos de control en milisegundos.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Recuperación detallada
<a name="flink-restart-fine-grained"></a>

**nota**  
El soporte de la recuperación detallada para el programador predeterminado es compatible a partir de la versión 6.14.0 de Flink en Amazon EMR en EKS. El soporte de la recuperación detallada para el programador predeterminado está disponible a partir de la versión 6.15.0 de Flink en Amazon EMR en EKS.

Cuando se produce un error en una tarea durante la ejecución, Flink restablece todo el gráfico de ejecución y activa una nueva ejecución completa desde el último punto de control completado. Esto es más caro que volver a ejecutar las tareas con errores. La recuperación detallada reinicia solo el componente conectado a la canalización de la tarea con el error. En el siguiente ejemplo, el gráfico de tareas tiene 5 vértices (de `A` a `E`). Todas las conexiones entre los vértices se canalizan con una distribución puntual y el valor `parallelism.default` para el trabajo se establece en `2`. 

```
A → B → C → D → E
```

En este ejemplo, hay un total de 10 tareas en ejecución. La primera canalización (de `a1` a `e1`) se ejecuta en un TaskManager (`TM1`) y la segunda canalización (de `a2` a `e2`) se ejecuta en otro TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Hay dos componentes conectados por canalización: `a1 → e1` y `a2 → e2`. Si `TM1` o `TM2` falla, el error solo afecta a las 5 tareas de la canalización en la que el TaskManager se estaba ejecutando. La estrategia de reinicio solo inicia el componente canalizado afectado. 

La recuperación detallada solo funciona con trabajos de Flink perfectamente paralelos. No es compatible con las operaciones `keyBy()` o `redistribute()`. Para obtener más información, consulte [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) en el proyecto de Jira *Flink Improvement Proposal*.

Para habilitar la recuperación detallada, establezca las siguientes configuraciones en el archivo `flink-conf.yaml`.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Mecanismo de reinicio combinado en el programador adaptativo
<a name="flink-restart-combined"></a>

**nota**  
El mecanismo de reinicio combinado del programador adaptativo es compatible a partir de la versión 6.15.0 de Flink en Amazon EMR en EKS.

El programador adaptativo puede ajustar el paralelismo del trabajo en función de los espacios disponibles. Reduce automáticamente el paralelismo si no hay suficientes espacios disponibles para ajustarse al paralelismo del trabajo configurado. Si hay nuevos espacios disponibles, la tarea se escala verticalmente de nuevo al paralelismo del trabajo configurado. Un programador adaptativo evita el tiempo de inactividad del trabajo cuando no hay suficientes recursos disponibles. Este es el programador compatible con el escalador automático de Flink. Recomendamos el programador adaptativo con Amazon EMR Flink por estos motivos. Sin embargo, los programadores adaptativos pueden realizar varios reinicios en un periodo corto, uno por cada nuevo recurso agregado. Esto puede provocar una disminución en el rendimiento del trabajo.

Con Amazon EMR 6.15.0 y versiones posteriores, Flink cuenta con un mecanismo de reinicio combinado en el programador adaptativo que abre una ventana de reinicio cuando se agrega el primer recurso y, a continuación, espera hasta alcanzar el intervalo de ventana configurado del minuto predeterminado. Realiza un único reinicio cuando hay suficientes recursos disponibles para ejecutar el trabajo con el paralelismo configurado o cuando se agota el intervalo.

Con trabajos de muestra, nuestras pruebas comparativas han demostrado que esta característica procesa un 10 % de los registros más que el comportamiento predeterminado cuando se utiliza el programador adaptativo y el escalador automático de Flink.

Para habilitar el mecanismo de reinicio combinado, defina las siguientes configuraciones en el archivo `flink-conf.yaml`.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Desactivación rápida de las instancias de spot con Flink en Amazon EMR en EKS
<a name="jobruns-flink-decommission"></a>

Flink con Amazon EMR en EKS puede mejorar el tiempo de reinicio de un trabajo durante las operaciones de recuperación o escalado de tareas.

## Descripción general de
<a name="jobruns-flink-decommission-overview"></a>

A partir de las versiones 6.15.0, Amazon EMR en EKS es compatible con la desactivación rápida de los administradores de tareas en instancias de sport de Amazon EMR en EKS con Apache Flink. Como parte de esta característica, Amazon EMR en EKS con Flink ofrece las siguientes funciones:
+ **Just-in-time puntos de control**: los trabajos de streaming de Flink pueden responder a la interrupción de una instancia puntual, realizar un control just-in-time (JIT) de los trabajos en ejecución e impedir la programación de tareas adicionales en estas instancias puntuales. El punto de control JIT es compatible con el programador predeterminado y adaptativo.
+ **Mecanismo de reinicio combinado** : un mecanismo de reinicio combinado hace todo lo posible por reiniciar el trabajo una vez alcanzado el paralelismo de los recursos objetivo o al final de la ventana configurada actualmente. Esto también evita que los trabajos se reinicien de forma consecutiva, lo que podría deberse a la finalización de varias instancias de sport. El mecanismo de reinicio combinado solo está disponible con el programador adaptativo.

Estas capacidades brindan los siguientes beneficios:
+ Puede aprovechar las instancias de spot para ejecutar administradores de tareas y reducir el gasto en clústeres.
+ La mejora de la agilidad del administrador de tareas de las instancias de spot se traduce en una mayor resiliencia y en una programación de trabajos más eficiente.
+ Sus trabajos de Flink tendrán más tiempo de actividad porque se reiniciarán menos tras la finalización de una instancia de Spot.

## Cómo funciona una desactivación rápida
<a name="jobruns-flink-decommission-howitworks"></a>

Considere el siguiente ejemplo: aprovisiona un clúster de Amazon EMR en EKS que ejecuta Apache Flink y especifica los nodos bajo demanda para el administrador de trabajos y los nodos de las instancias de spot para el administrador de tareas. Dos minutos antes de la finalización, el administrador de tareas recibe un aviso de interrupción.

En este escenario, el administrador de trabajos gestionaría la señal de interrupción de la instancia de spot, bloquearía la programación de tareas adicionales en la instancia de spot e iniciaría los puntos de control JIT para el trabajo de transmisión.

A continuación, el administrador de trabajos reiniciaría el gráfico de tareas solo cuando haya suficiente disponibilidad de nuevos recursos para satisfacer el paralelismo de tareas actual en la ventana de intervalos de reinicio actual. El intervalo de reinicios se decide en función de la duración del reemplazo de la instancia de spot, la creación de nuevos pods del administrador de tareas y el registro en el administrador de trabajos.

## Requisitos previos
<a name="jobruns-flink-decommission-prereqs"></a>

Para utilizar una desactivación rápida, cree y ejecute un trabajo de streaming en un clúster de Amazon EMR en EKS que ejecute Apache Flink. Habilite el programador adaptativo y los administradores de tareas programados en al menos una instancia de spot, como se muestra en el siguiente ejemplo. Debe utilizar nodos bajo demanda para el administrador de trabajos y puede usar nodos bajo demanda para los administradores de tareas siempre que también haya al menos una instancia de spot.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Configuración
<a name="jobruns-flink-decommission-config"></a>

En esta sección, se describe la mayoría de las configuraciones que puede especificar para sus necesidades de desactivación. 


| Key | Description (Descripción) | Predeterminado | Valores aceptables | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Habilite la desactivación rápida del administrador de tareas.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Habilite el mecanismo de reinicio combinado en el programador adaptativo.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  El intervalo combinado de la ventana de reinicios para realizar los reinicios combinados del trabajo. Un entero sin unidad se interpreta como milisegundos.  |  1m  |  Ejemplos: 30, 60s, 3m, 1h  | 