

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Esecuzione di un'applicazione Flink
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Con Amazon EMR 6.13.0 e rilasci successivi, puoi eseguire un'applicazione Flink con l'operatore Flink Kubernetes in modalità Applicazione su Amazon EMR su EKS. Con Amazon EMR 6.15.0 e rilasci successivi, puoi anche eseguire un'applicazione Flink in modalità Sessione. Questa pagina descrive entrambi i metodi che puoi utilizzare per eseguire un'applicazione Flink con Amazon EMR su EKS.

**Topics**

**Nota**  
Devi disporre di un bucket Amazon S3 creato per archiviare i metadati ad alta disponibilità del processo quando invii il processo Flink. Se non desideri utilizzare questa funzionalità, puoi disattivarla. È abilitata per impostazione predefinita.

**Prerequisito**: per poter eseguire un'applicazione Flink con l'operatore Flink Kubernetes, completa le fasi indicate in [Configurazione dell'operatore Flink Kubernetes per Amazon EMR su EKS](jobruns-flink-kubernetes-operator-setup.md) e [Installa l'operatore Kubernetes](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator).

------
#### [ Application mode ]

Con Amazon EMR 6.13.0 e rilasci successivi, puoi eseguire un'applicazione Flink con l'operatore Flink Kubernetes in modalità Applicazione su Amazon EMR su EKS.

1. Crea un file di `FlinkDeployment` definizione `basic-example-app-cluster.yaml` come nell'esempio seguente. Se hai attivato e utilizzi uno degli [opt-in Regioni AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), assicurati di decommentare e configurare la configurazione. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Invia l'implementazione Flink con il comando seguente. L'operazione creerà anche un oggetto `FlinkDeployment` denominato `basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Accedi all'interfaccia utente Flink.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Apri `localhost:8081` per visualizzare localmente i tuoi processi Flink.

1. Ripulisci il processo. Ricordati di ripulire gli artefatti di S3 che sono stati creati per questo lavoro, come i metadati di checkpoint, l'alta disponibilità, i savepointing e i log. CloudWatch

[Per ulteriori informazioni sull'invio di applicazioni a Flink tramite l'operatore Flink Kubernetes, consulta Esempi di operatori Flink Kubernetes nella cartella on.](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) `apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

Con Amazon EMR 6.15.0 e rilasci successivi, puoi eseguire un'applicazione Flink con l'operatore Flink Kubernetes in modalità Sessione su Amazon EMR su EKS.

1. Create un file di definizione denominato come nell'esempio seguente. `FlinkDeployment` `basic-example-app-cluster.yaml` Se hai attivato e utilizzi uno degli [opt-in Regioni AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), assicurati di decommentare e configurare la configurazione. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Invia l'implementazione Flink con il comando seguente. L'operazione creerà anche un oggetto `FlinkDeployment` denominato `basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Usa il seguente comando per confermare che il cluster di sessione `LIFECYCLE` è `STABLE`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   L'output visualizzato dovrebbe essere come il seguente esempio:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Crea un file di risorse di definizione personalizzato `FlinkSessionJob` `basic-session-job.yaml` con il seguente contenuto di esempio:

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Invia il processo della sessione Flink con il comando seguente. L'operazione creerà un oggetto `FlinkSessionJob` denominato `basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Utilizza il comando seguente per confermare che il cluster di sessione `LIFECYCLE` è `STABLE` e `JOB STATUS` è `RUNNING`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   L'output visualizzato dovrebbe essere come il seguente esempio:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Accedi all'interfaccia utente Flink.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Apri `localhost:8081` per visualizzare localmente i tuoi processi Flink.

1. Ripulisci il processo. Ricordati di ripulire gli artefatti di S3 che sono stati creati per questo lavoro, come i metadati di checkpoint, l'alta disponibilità, i savepointing e i log. CloudWatch

------