

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden der Hochverfügbarkeit (HA) für Flink-Operatoren und Flink-Anwendungen
<a name="jobruns-flink-using-ha"></a>

In diesem Thema wird gezeigt, wie Hochverfügbarkeit konfiguriert wird, und es wird beschrieben, wie dies für verschiedene Anwendungsfälle funktioniert. Dazu gehören, wenn Sie den Job Manager verwenden und wenn Sie native Kubernetes von Flink verwenden.

## Hochverfügbarkeit des Flink-Operators
<a name="jobruns-flink-ha-operator"></a>

Wir ermöglichen *Hochverfügbarkeit* für den Flink-Operator, sodass wir auf einen Standby-Flink-Operator umschalten können, um Ausfallzeiten im Regelkreis des Bedieners zu minimieren, falls Ausfälle auftreten. Hochverfügbarkeit ist standardmäßig aktiviert, und die Standardanzahl der Startoperatorreplikate ist 2. Sie können das Feld Replicas in Ihrer `values.yaml`-Datei für das Helm-Chart konfigurieren.

Die folgenden Felder sind anpassbar:
+ `replicas` (optional, Standardeinstellung ist 2): Wenn Sie diese Zahl auf einen Wert über 1 setzen, werden weitere Standby-Operatoren erstellt und Ihr Auftrag kann schneller wiederhergestellt werden.
+ `highAvailabilityEnabled` (optional, der Standardwert ist true): Steuert, ob Sie HA aktivieren möchten. Wenn Sie diesen Parameter auf true angeben, wird die Unterstützung für Multi-AZ-Bereitstellungen aktiviert und die richtigen `flink-conf.yaml`-Parameter festgelegt.

Sie können HA für Ihren Betreiber deaktivieren, indem Sie die folgende Konfiguration in Ihrer `values.yaml`-Datei festlegen.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Multi-AZ-Bereitstellung**

Wir erstellen die Operator-Pods in mehreren Availability Zones. Dabei handelt es sich um eine weiche Einschränkung, und Ihre Operator-Pods werden in derselben AZ geplant, falls Sie nicht über genügend Ressourcen in einer anderen AZ verfügen.

**Bestimmung des Leader-Replikats**

 Wenn HA aktiviert ist, verwenden die Replikate ein Leasing, um zu ermitteln, welcher der Leader JMs ist, und verwenden einen K8s-Lease für die Auswahl des Leaders. Sie können den Lease beschreiben und anhand des Felds .Spec.Holder Identity den aktuellen Leader ermitteln

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Flink-S3-Interaktion**

**Konfigurieren von Zugriffs-Anmeldeinformationen**

Bitte stellen Sie sicher, dass Sie IRSA mit den entsprechenden IAM-Berechtigungen für den Zugriff auf den S3-Bucket konfiguriert haben.

**Auftrag-Jars werden aus dem S3-Anwendungsmodus abgerufen**

Der Flink-Operator unterstützt auch das Abrufen von Anwendungs-Jars aus S3. Sie geben einfach den S3-Speicherort für die JarURI in Ihrer Spezifikation an. FlinkDeployment 

Sie können diese Funktion auch verwenden, um andere Artefakte wie PyFlink Skripte herunterzuladen. Das resultierende Python-Skript wird unter dem Pfad `/opt/flink/usrlib/` abgelegt.

Das folgende Beispiel zeigt, wie Sie diese Funktion für einen PyFlink Job verwenden. Beachten Sie die Felder jarURI und args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Flink-S3-Konnektoren**

Flink wird mit zwei S3-Konnektoren geliefert (unten aufgeführt). In den folgenden Abschnitten wird erläutert, wann welcher Anschluss verwendet werden sollte.

**Checkpointing: Presto-S3-Konnektoren**
+ S3-Schema auf s3p://setzen
+ Der empfohlene Konnektor für den Checkpoint zu s3. Weitere Informationen finden Sie unter [S3-specific](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) in der Apache Flink-Dokumentation.

Beispielspezifikation: FlinkDeployment 

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Lesen und Schreiben auf S3: Hadoop S3-Konnektor**
+ S3-Schema auf `s3://` oder (`s3a://`) setzen
+ Der empfohlene Konnektor zum Lesen und Schreiben von Dateien aus S3 (einziger S3-Konnektor, der die [Flinks-Dateisystemschnittstelle](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/) implementiert).
+ Standardmäßig legen wir `fs.s3a.aws.credentials.provider` in der `flink-conf.yaml` Datei fest, was ist`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Wenn Sie die Standardeinstellung `flink-conf` vollständig überschreiben und mit S3 interagieren, stellen Sie sicher, dass Sie diesen Anbieter verwenden.

 FlinkDeployment Beispielspezifikation

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Flink Job Manager
<a name="jobruns-flink-ha-manager"></a>

Mit High Availability (HA) für Flink-Bereitstellungen können Jobs auch dann weiter bearbeitet werden, wenn ein vorübergehender Fehler auftritt und Ihr System abstürzt. JobManager Die Aufträge werden neu gestartet, jedoch ab dem letzten erfolgreichen Checkpoint mit aktivierter HA. Wenn HA nicht aktiviert ist, startet Kubernetes Ihren Job neu JobManager, aber Ihr Job wird als neuer Job gestartet und verliert seinen Fortschritt. Nach der Konfiguration von HA können wir Kubernetes anweisen, die HA-Metadaten in einem persistenten Speicher zu speichern, um im Falle eines vorübergehenden Fehlers darauf zurückgreifen zu können, JobManager und dann unsere Jobs vom letzten erfolgreichen Checkpoint aus fortsetzen.

HA ist standardmäßig für Ihre Flink-Aufträge aktiviert (die Anzahl der Replikate ist auf 2 festgelegt, sodass Sie einen S3-Speicherort angeben müssen, damit HA-Metadaten bestehen bleiben).

**HA-Konfigurationen**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Im Folgenden finden Sie Beschreibungen für die oben genannten HA-Konfigurationen in JobManager (definiert unter .spec.JobManager):
+ `highAvailabilityEnabled` (optional, der Standardwert ist true): Setzen Sie diesen Wert auf `false `, wenn Sie HA nicht aktivieren und die bereitgestellten HA-Konfigurationen nicht verwenden möchten. Sie können das Feld „Replicas“ immer noch bearbeiten, um HA manuell zu konfigurieren.
+ `replicas`(optional, Standard ist 2): Wenn Sie diese Zahl auf einen Wert über 1 setzen, wird ein anderer Standby-Modus erstellt JobManagers und Ihr Job kann schneller wiederhergestellt werden. Wenn Sie HA deaktivieren, müssen Sie die Anzahl der Replikate auf 1 setzen, sonst erhalten Sie weiterhin Validierungsfehler (nur 1 Replikat wird unterstützt, wenn HA nicht aktiviert ist).
+ `storageDir` (erforderlich): Da wir die Anzahl der Replikate standardmäßig auf 2 setzen, müssen wir ein persistentes StorageDir bereitstellen. Derzeit akzeptiert dieses Feld nur S3-Pfade als Speicherort.

**Pod-Lokalität**

 Wenn Sie HA aktivieren, versuchen wir auch, Pods in derselben AZ zusammenzufassen, was zu einer verbesserten Leistung führt (geringere Netzwerklatenz, da sich Pods in derselben befinden AZs). Dabei handelt es sich um einen bestmöglichen Prozess. Wenn Sie also nicht über genügend Ressourcen in der AZ verfügen, in der die meisten Ihrer Pods geplant sind, werden die verbleibenden Pods zwar geplant, landen aber möglicherweise auf einem Knoten außerhalb dieser AZ.

**Bestimmung des Leader-Replikats**

Wenn HA aktiviert ist, ermitteln die Replikate anhand eines Leases, welches der Leader JMs ist, und verwenden eine K8s-Configmap als Datenspeicher zum Speichern dieser Metadaten. Wenn Sie den Leader ermitteln möchten, können Sie sich den Inhalt der Configmap und den Schlüssel `org.apache.flink.k8s.leader.restserver` unter Daten ansehen, um den K8s-Pod mit der IP-Adresse zu finden. Sie können auch die folgenden Bash-Befehle verwenden.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Flink-Auftrag – natives Kubernetes
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 und höher unterstützt Flink Native Kubernetes für die Ausführung von Flink-Anwendungen im Hochverfügbarkeitsmodus auf einem Amazon-EKS-Cluster. 

**Anmerkung**  
Sie müssen einen Amazon-S3-Bucket erstellt haben, um die Hochverfügbarkeitsmetadaten zu speichern, wenn Sie Ihren Flink-Auftrag einreichen. Wenn Sie dieses Feature nicht verwenden möchten, können Sie sie deaktivieren. Sie ist standardmäßig aktiviert.

Um die Flink-Hochverfügbarkeitsfunktion zu aktivieren, geben Sie die folgenden Flink-Parameter an, wenn Sie [den `run-application`-CLI-Befehl ausführen](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Die Parameter sind unter dem Beispiel definiert.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`** – Ein S3-Bucket, in dem Sie die Ergebnisse dieser Anforderung speichern möchten.

  **`Dkubernetes.jobmanager.replicas`** – Die Anzahl der Job-Manager-Pods, die als Ganzzahl größer als `1` erstellt werden sollen.

  **`Dkubernetes.cluster-id`** – Eine eindeutige ID, die den Flink-Cluster identifiziert.