

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Eine Flink-Anwendung ausführen
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Mit Amazon EMR 6.13.0 und höher können Sie eine Flink-Anwendung mit dem Flink-Kubernetes-Operator im Anwendungsmodus in Amazon EMR in EKS ausführen. Mit Amazon EMR 6.15.0 und höher können Sie eine Flink-Anwendung auch im Sitzungsmodus ausführen. Auf dieser Seite werden beide Methoden beschrieben, mit denen Sie eine Flink-Anwendung mit Amazon EMR in EKS ausführen können.

**Topics**

**Anmerkung**  
Sie müssen einen Amazon-S3-Bucket erstellt haben, um die Hochverfügbarkeitsmetadaten zu speichern, wenn Sie Ihren Flink-Auftrag einreichen. Wenn Sie dieses Feature nicht verwenden möchten, können Sie sie deaktivieren. Sie ist standardmäßig aktiviert.

**Voraussetzung** – Bevor Sie eine Flink-Anwendung mit dem Flink-Kubernetes-Operator ausführen können, führen Sie die Schritte in [Flink-Kubernetes-Operator für Amazon EMR in EKS einrichten](jobruns-flink-kubernetes-operator-setup.md) und [Installieren Sie den Kubernetes-Operator](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) aus.

------
#### [ Application mode ]

Mit Amazon EMR 6.13.0 und höher können Sie eine Flink-Anwendung mit dem Flink-Kubernetes-Operator im Anwendungsmodus in Amazon EMR in EKS ausführen.

1. Erstellen Sie eine `FlinkDeployment` Definitionsdatei `basic-example-app-cluster.yaml` wie im folgenden Beispiel. Wenn Sie eines der [Opt-ins](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html) aktiviert und verwendet haben, stellen Sie sicher AWS-Regionen, dass Sie das Kommentarfeld entfernen und die Konfiguration konfigurieren. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Senden Sie die Flink-Bereitstellung mit dem folgenden Befehl. Dadurch wird auch ein `FlinkDeployment`-Objekt mit dem `basic-example-app-cluster`-Namen erstellt.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Greifen Sie auf die Flink-Benutzeroberfläche zu.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Öffnen Sie `localhost:8081`, um Ihre Flink-Aufträge lokal anzusehen.

1. Bereinigen Sie den Auftrag. Denken Sie daran, die S3-Artefakte zu bereinigen, die für diesen Job erstellt wurden, z. B. Checkpointing-, Hochverfügbarkeits-, Savepointing-Metadaten und Protokolle. CloudWatch

[Weitere Informationen zum Einreichen von Bewerbungen über den Flink-Kubernetes-Operator an Flink finden Sie unter Beispiele für Flink-Kubernetes-Operatoren im Ordner unter.](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) `apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

Mit Amazon EMR 6.15.0 und höher können Sie eine Flink-Anwendung mit dem Flink-Kubernetes-Operator im Sitzungsmodus in Amazon EMR in EKS ausführen.

1. Erstellen Sie eine `FlinkDeployment` Definitionsdatei mit dem Namen im folgenden Beispiel. `basic-example-app-cluster.yaml` Wenn Sie eines der [Opt-ins](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html) aktiviert und verwendet haben, stellen Sie sicher AWS-Regionen, dass Sie das Kommentarfeld entfernen und die Konfiguration konfigurieren. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Senden Sie die Flink-Bereitstellung mit dem folgenden Befehl. Dadurch wird auch ein `FlinkDeployment`-Objekt mit dem `basic-example-session-cluster`-Namen erstellt.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Verwenden Sie den folgenden Befehl, um zu bestätigen, dass der Sitzungscluster `LIFECYCLE` `STABLE` ist:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Die Ausgabe sollte ähnlich wie im folgenden Beispiel aussehen:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Erstellen Sie eine `FlinkSessionJob`-Aufgabendefinitions-Datei `basic-session-job.yaml` mit dem folgenden Beispielinhalt:

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Reichen Sie den Flink-Sitzungsauftrag mit dem folgenden Befehl ein. Dadurch wird auch ein `FlinkSessionJob`-Objekt `basic-session-job` erstellt.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Verwenden Sie den folgenden Befehl, um zu bestätigen, dass der Sitzungscluster `LIFECYCLE` `STABLE` und dass der `JOB STATUS` `RUNNING` ist:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Die Ausgabe sollte ähnlich wie im folgenden Beispiel aussehen:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Greifen Sie auf die Flink-Benutzeroberfläche zu.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Öffnen Sie `localhost:8081`, um Ihre Flink-Aufträge lokal anzusehen.

1. Bereinigen Sie den Auftrag. Denken Sie daran, die S3-Artefakte zu bereinigen, die für diesen Job erstellt wurden, z. B. Checkpointing-, Hochverfügbarkeits-, Savepointing-Metadaten und Protokolle. CloudWatch

------