

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Flink アプリケーションを実行する
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Amazon EMR 6.13.0 以降では、EKS 上の Amazon EMR のアプリケーションモードで Flink Kubernetes オペレーターを使用して Flink アプリケーションを実行できます。Amazon EMR 6.15.0 以降では、Flink アプリケーションをセッションモードで実行することもできます。このページでは、EKS 上の Amazon EMR で Flink アプリケーションを実行するために使用できる両方の方法について説明します。

**Topics**

**注記**  
Flink ジョブを送信する際に高可用性メタデータを保存するには、Amazon S3 バケットを作成する必要があります。この機能を使用しない場合には無効にできます。これは、デフォルトでは有効になっています。

**前提条件** - Flink Kubernetes オペレータを使用して Flink アプリケーションを実行する前に、[Amazon EMR on EKS での Flink Kubernetes オペレータのセットアップ](jobruns-flink-kubernetes-operator-setup.md) と [Kubernetes オペレータをインストールする](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator) のステップを完了してください。

------
#### [ Application mode ]

Amazon EMR 6.13.0 以降では、EKS 上の Amazon EMR のアプリケーションモードで Flink Kubernetes オペレーターを使用して Flink アプリケーションを実行できます。

1. 以下の例のように `FlinkDeployment` 定義ファイル `basic-example-app-cluster.yaml` を作成します。いずれかのオ[プトイン AWS リージョン](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)を有効にして使用する場合は、必ずコメントを解除して設定してください`fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 次のコマンドで Flink デプロイを送信します。これにより、`basic-example-app-cluster` という名前で `FlinkDeployment` オブジェクトも作成されます。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Flink UI にアクセスします。

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. `localhost:8081` を開いて、Flink ジョブをローカルに表示します。

1. ジョブをクリーンアップします。チェックポイント、高可用性、セーブポイントメタデータ、CloudWatch ログなど、このジョブ用に作成された S3 アーティファクトを忘れずにクリーンアップしてください。

Flink Kubernetes オペレータを利用して Flink にアプリケーションを送信する方法の詳細については、GitHub の `apache/flink-kubernetes-operator` フォルダにある「[Flink Kubernetes operator examples](https://github.com/apache/flink-kubernetes-operator/tree/main/examples)」を参照してください。

------
#### [ Session mode ]

Amazon EMR 6.15.0 以降では、EKS 上の Amazon EMR のセッションモードで Flink Kubernetes オペレーターを使用して Flink アプリケーションを実行できます。

1. 以下の例のように `basic-example-app-cluster.yaml` という名前の `FlinkDeployment` 定義ファイルを作成します。いずれかのオ[プトイン AWS リージョン](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html)を有効にして使用する場合は、必ずコメントを解除して設定してください`fs.s3a.endpoint.region`。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. 次のコマンドで Flink デプロイを送信します。これにより、`basic-example-session-cluster` という名前で `FlinkDeployment` オブジェクトも作成されます。

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. 次のコマンドを使用して、セッションクラスター `LIFECYCLE` が `STABLE` であることを確認します。

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   出力は次の例と類似したものになります。

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. 次のサンプルコンテンツを含む `FlinkSessionJob` カスタム定義リソースファイル `basic-session-job.yaml` を作成します。

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. 次のコマンドで Flink セッションジョブを送信します。これにより、`FlinkSessionJob` オブジェクト `basic-session-job` が作成されます。

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. 次のコマンドを使用して、セッションクラスター `LIFECYCLE` が `STABLE`、`JOB STATUS` が `RUNNING` であることを確認します。

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   出力は次の例と類似したものになります。

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Flink UI にアクセスします。

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. `localhost:8081` を開いて、Flink ジョブをローカルに表示します。

1. ジョブをクリーンアップします。チェックポイント、高可用性、セーブポイントメタデータ、CloudWatch ログなど、このジョブ用に作成された S3 アーティファクトを忘れずにクリーンアップしてください。

------