

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menjalankan pekerjaan Flink dengan Amazon EMR di EKS
<a name="run-flink-jobs"></a>

Amazon EMR merilis 6.13.0 dan lebih tinggi mendukung Amazon EMR di EKS dengan Apache Flink, atau operator Flink Kubernetes, sebagai model pengiriman pekerjaan untuk Amazon EMR di EKS. Dengan Amazon EMR di EKS dengan Apache Flink, Anda dapat menerapkan dan mengelola aplikasi Flink dengan runtime rilis Amazon EMR di kluster Amazon EKS Anda sendiri. Setelah Anda menerapkan operator Flink Kubernetes di cluster Amazon EKS Anda, Anda dapat langsung mengirimkan aplikasi Flink dengan operator. Operator mengelola siklus hidup aplikasi Flink.

**Topics**
+ [Menyiapkan dan menggunakan operator Flink Kubernetes](jobruns-flink-kubernetes-operator.md)
+ [Menggunakan Flink Native Kubernetes](jobruns-flink-native-kubernetes.md)
+ [Menyesuaikan gambar Docker untuk Flink dan FluentD](jobruns-flink-docker-flink-fluentd.md)
+ [Memantau pekerjaan operator Flink Kubernetes dan Flink](jobruns-flink-monitoring.md)
+ [Bagaimana Flink mendukung ketersediaan tinggi dan ketahanan kerja](jobruns-flink-resiliency.md)
+ [Menggunakan Autoscaler untuk aplikasi Flink](jobruns-flink-autoscaler.md)
+ [Pemeliharaan dan pemecahan masalah untuk pekerjaan Flink di Amazon EMR di EKS](jobruns-flink-troubleshooting.md)
+ [Rilis yang didukung untuk Amazon EMR di EKS dengan Apache Flink](jobruns-flink-security-release-versions.md)

# Menyiapkan dan menggunakan operator Flink Kubernetes
<a name="jobruns-flink-kubernetes-operator"></a>

Halaman-halaman berikut menjelaskan cara mengatur dan menggunakan operator Flink Kubernetes untuk menjalankan pekerjaan Flink dengan Amazon EMR di EKS. Topik yang tersedia mencakup prasyarat yang diperlukan, cara mengatur lingkungan Anda, dan menjalankan aplikasi Flink di Amazon EMR di EKS.

**Topics**
+ [Menyiapkan operator Flink Kubernetes untuk Amazon EMR di EKS](jobruns-flink-kubernetes-operator-setup.md)
+ [Menginstal operator Flink Kubernetes untuk Amazon EMR di EKS](jobruns-flink-kubernetes-operator-getting-started.md)
+ [Jalankan aplikasi Flink](jobruns-flink-kubernetes-operator-run-application.md)
+ [Izin peran keamanan untuk menjalankan aplikasi Flink](jobruns-flink-kubernetes-security.md)
+ [Menghapus instalasi operator Flink Kubernetes untuk Amazon EMR di EKS](jobruns-flink-kubernetes-operator-uninstall.md)

# Menyiapkan operator Flink Kubernetes untuk Amazon EMR di EKS
<a name="jobruns-flink-kubernetes-operator-setup"></a>

Selesaikan tugas-tugas berikut untuk menyiapkan sebelum Anda menginstal operator Flink Kubernetes di Amazon EKS. Jika Anda sudah mendaftar untuk Amazon Web Services (AWS) dan telah menggunakan Amazon EKS, Anda hampir siap untuk menggunakan Amazon EMR di EKS. Selesaikan tugas-tugas berikut untuk menyiapkan operator Flink di Amazon EKS. Jika Anda telah menyelesaikan salah satu prasyarat, Anda dapat melewatinya dan melanjutkan ke yang berikutnya.
+ **[Instal atau perbarui ke versi terbaru AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** - Jika Anda sudah menginstal AWS CLI, konfirmasikan bahwa Anda memiliki versi terbaru.
+ **[Siapkan kubectl dan eksctl - eksctl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html)** adalah alat baris perintah yang Anda gunakan untuk berkomunikasi dengan Amazon EKS.
+ **[Instal Helm](https://docs.aws.amazon.com/eks/latest/userguide/helm.html)** — Manajer paket Helm untuk Kubernetes membantu Anda menginstal dan mengelola aplikasi di klaster Kubernetes Anda. 
+ **[Memulai Amazon EKS — eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html)** — Ikuti langkah-langkah untuk membuat cluster Kubernetes baru dengan node di Amazon EKS.
+ **[Pilih label rilis EMR Amazon (rilis](jobruns-flink-security-release-versions.md) 6.13.0 atau lebih tinggi)** — operator Flink Kubernetes didukung dengan Amazon EMR rilis 6.13.0 dan yang lebih tinggi.
+ **[Aktifkan Peran IAM untuk Akun Layanan (IRSA) di klaster Amazon EKS](setting-up-enable-IAM.md)**.
+ **[Buat peran eksekusi pekerjaan](creating-job-execution-role.md)**.
+ **[Perbarui kebijakan kepercayaan dari peran eksekusi pekerjaan](setting-up-trust-policy.md)**.
+ Buat peran eksekusi operator. Langkah ini bersifat opsional. Anda dapat menggunakan peran yang sama untuk pekerjaan dan operator Flink. Jika Anda ingin memiliki peran IAM yang berbeda untuk operator Anda, Anda dapat membuat peran terpisah.
+ Perbarui kebijakan kepercayaan dari peran eksekusi operator. Anda harus secara eksplisit menambahkan satu entri kebijakan kepercayaan untuk peran yang ingin Anda gunakan untuk akun layanan operator Amazon EMR Flink Kubernetes. Anda dapat mengikuti format contoh ini:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sts:AssumeRoleWithWebIdentity"
        ],
        "Resource": [
          "*"
        ],
        "Condition": {
          "StringLike": {
            "aws:userid": "system:serviceaccount:emr:emr-containers-sa-flink-operator"
          }
        },
        "Sid": "AllowSTSAssumerolewithwebidentity"
      }
    ]
  }
  ```

------

# Menginstal operator Flink Kubernetes untuk Amazon EMR di EKS
<a name="jobruns-flink-kubernetes-operator-getting-started"></a>

Topik ini membantu Anda mulai menggunakan operator Flink Kubernetes di Amazon EKS dengan menyiapkan penerapan Flink.

## Instal operator Kubernetes
<a name="jobruns-flink-kubernetes-operator-getting-started-install-operator"></a>

Gunakan langkah-langkah berikut untuk menginstal operator Kubernetes untuk Apache Flink.

1. Jika Anda belum melakukannya, selesaikan langkah-langkahnya[Menyiapkan operator Flink Kubernetes untuk Amazon EMR di EKS](jobruns-flink-kubernetes-operator-setup.md).

1. Instal *cert-manager* (sekali per kluster Amazon EKS) untuk mengaktifkan penambahan komponen webhook.

   ```
   kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

1. Instal bagan Helm.

   ```
   export VERSION=7.12.0 # The Amazon EMR release version
   export NAMESPACE=The Kubernetes namespace to deploy the operator
   
   helm install flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   Contoh output:

   ```
   NAME: flink-kubernetes-operator
   LAST DEPLOYED: Tue May 31 17:38:56 2022
   NAMESPACE: $NAMESPACE
   STATUS: deployed
   REVISION: 1
   TEST SUITE: None
   ```

1. Tunggu hingga penerapan selesai dan verifikasi instalasi bagan.

   ```
   kubectl wait deployment flink-kubernetes-operator --namespace $NAMESPACE --for condition=Available=True --timeout=30s
   ```

1. Anda akan melihat pesan berikut saat penerapan selesai.

   ```
   deployment.apps/flink-kubernetes-operator condition met
   ```

1. Gunakan perintah berikut untuk melihat operator yang dikerahkan.

   ```
   helm list --namespace $NAMESPACE
   ```

   Berikut ini menunjukkan contoh keluaran, di mana versi aplikasi `x.y.z-amzn-n` akan sesuai dengan versi operator Flink untuk EMR Amazon Anda pada rilis EKS. Untuk informasi selengkapnya, lihat [Rilis yang didukung untuk Amazon EMR di EKS dengan Apache Flink](jobruns-flink-security-release-versions.md).

   ```
   NAME                              NAMESPACE    REVISION    UPDATED                                STATUS      CHART                                   APP VERSION          
   flink-kubernetes-operator    $NAMESPACE   1           2023-02-22 16:43:45.24148 -0500 EST    deployed    flink-kubernetes-operator-emr-7.12.0    x.y.z-amzn-n
   ```

### Tingkatkan operator Kubernetes
<a name="jobruns-flink-kubernetes-operator-upgrade"></a>

Untuk memutakhirkan versi operator Flink, ikuti langkah-langkah ini:

1. Copot pemasangan yang lama`flink-kubernetes-operator`:`helm uninstall flink-kubernetes-operator -n <NAMESPACE>`.

1. Hapus CRD (karena helm tidak akan secara otomatis menghapus CRD lama):. `kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org`

1. Instal ulang `flink-kubernetes-operator` dengan versi yang lebih baru.

# Jalankan aplikasi Flink
<a name="jobruns-flink-kubernetes-operator-run-application"></a>

Dengan Amazon EMR 6.13.0 dan yang lebih tinggi, Anda dapat menjalankan aplikasi Flink dengan operator Flink Kubernetes dalam mode Aplikasi di Amazon EMR di EKS. Dengan Amazon EMR 6.15.0 dan yang lebih tinggi, Anda juga dapat menjalankan aplikasi Flink dalam mode Sesi. Halaman ini menjelaskan kedua metode yang dapat Anda gunakan untuk menjalankan aplikasi Flink dengan Amazon EMR di EKS.

**Topics**

**catatan**  
Anda harus memiliki bucket Amazon S3 yang dibuat untuk menyimpan metadata ketersediaan tinggi saat mengirimkan pekerjaan Flink Anda. Jika Anda tidak ingin menggunakan fitur ini, Anda dapat menonaktifkannya. Ini diaktifkan secara default.

**Prasyarat** — Sebelum Anda dapat menjalankan aplikasi Flink dengan operator Flink Kubernetes, selesaikan langkah-langkah di dan. [Menyiapkan operator Flink Kubernetes untuk Amazon EMR di EKS](jobruns-flink-kubernetes-operator-setup.md) [Instal operator Kubernetes](jobruns-flink-kubernetes-operator-getting-started.md#jobruns-flink-kubernetes-operator-getting-started-install-operator)

------
#### [ Application mode ]

Dengan Amazon EMR 6.13.0 dan yang lebih tinggi, Anda dapat menjalankan aplikasi Flink dengan operator Flink Kubernetes dalam mode Aplikasi di Amazon EMR di EKS.

1. Buat file `FlinkDeployment` definisi `basic-example-app-cluster.yaml` seperti pada contoh berikut. Jika Anda mengaktifkan dan menggunakan salah satu [opt-in Wilayah AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), pastikan Anda membatalkan komentar dan mengonfigurasi konfigurasi. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-app-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH 
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
     jobManager:
       storageDir: HIGH_AVAILABILITY_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       # if you have your job jar in S3 bucket you can use that path as well
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: savepoint
       savepointTriggerNonce: 0
     monitoringConfiguration:    
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Kirim penyebaran Flink dengan perintah berikut. Ini juga akan membuat `FlinkDeployment` objek bernama`basic-example-app-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
   ```

1. Akses UI Flink.

   ```
   kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
   ```

1. Buka `localhost:8081` untuk melihat pekerjaan Flink Anda secara lokal.

1. Bersihkan pekerjaan. Ingatlah untuk membersihkan artefak S3 yang dibuat untuk pekerjaan ini, seperti checkpointing, ketersediaan tinggi, metadata savepointing, dan log. CloudWatch

Untuk informasi selengkapnya tentang mengirimkan aplikasi ke Flink melalui operator Flink Kubernetes, lihat contoh operator Flink Kubernetes di [folder](https://github.com/apache/flink-kubernetes-operator/tree/main/examples) di. `apache/flink-kubernetes-operator` GitHub

------
#### [ Session mode ]

Dengan Amazon EMR 6.15.0 dan yang lebih tinggi, Anda dapat menjalankan aplikasi Flink dengan operator Flink Kubernetes dalam mode Session di Amazon EMR di EKS.

1. Buat file `FlinkDeployment` definisi bernama `basic-example-app-cluster.yaml` seperti pada contoh berikut. Jika Anda mengaktifkan dan menggunakan salah satu [opt-in Wilayah AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), pastikan Anda membatalkan komentar dan mengonfigurasi konfigurasi. `fs.s3a.endpoint.region`

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example-session-cluster
   spec:
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       #fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
       state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
       state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
     flinkVersion: v1_17
     executionRoleArn: JOB_EXECUTION_ROLE_ARN
     emrReleaseLabel: "emr-6.15.0-flink-latest"
     jobManager:
       storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     monitoringConfiguration:    
       s3MonitoringConfiguration:
          logUri: 
       cloudWatchMonitoringConfiguration:
          logGroupName: LOG_GROUP_NAME
   ```

1. Kirim penyebaran Flink dengan perintah berikut. Ini juga akan membuat `FlinkDeployment` objek bernama`basic-example-session-cluster`.

   ```
   kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
   ```

1. Gunakan perintah berikut untuk mengonfirmasi bahwa cluster sesi `LIFECYCLE` adalah`STABLE`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Outputnya harus mirip dengan contoh berikut:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster                          STABLE
   ```

1. Buat file sumber daya definisi `FlinkSessionJob` khusus `basic-session-job.yaml` dengan contoh konten berikut:

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   metadata:
     name: basic-session-job
   spec:
     deploymentName: basic-session-deployment
     job:
       # If you have your job jar in an S3 bucket you can use that path.
       # To use jar in S3 bucket, set 
       # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN)
       # when you install Spark operator
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
       parallelism: 2
       upgradeMode: stateless
   ```

1. Kirim pekerjaan sesi Flink dengan perintah berikut. Ini akan membuat `FlinkSessionJob` objek`basic-session-job`.

   ```
   kubectl apply -f basic-session-job.yaml -n $NAMESPACE
   ```

1. Gunakan perintah berikut untuk mengonfirmasi bahwa cluster sesi `LIFECYCLE` adalah`STABLE`, dan `JOB STATUS` adalah`RUNNING`:

   ```
   kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
   ```

   Outputnya harus mirip dengan contoh berikut:

   ```
   NAME                              JOB STATUS   LIFECYCLE STATE
   basic-example-session-cluster     RUNNING      STABLE
   ```

1. Akses UI Flink.

   ```
   kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
   ```

1. Buka `localhost:8081` untuk melihat pekerjaan Flink Anda secara lokal.

1. Bersihkan pekerjaan. Ingatlah untuk membersihkan artefak S3 yang dibuat untuk pekerjaan ini, seperti checkpointing, ketersediaan tinggi, metadata savepointing, dan log. CloudWatch

------

# Izin peran keamanan untuk menjalankan aplikasi Flink
<a name="jobruns-flink-kubernetes-security"></a>

Topik ini menjelaskan peran keamanan untuk menerapkan dan menjalankan aplikasi Flink. Ada dua peran yang diperlukan untuk mengelola penyebaran dan untuk membuat dan mengelola pekerjaan, peran operator dan peran pekerjaan. Topik ini memperkenalkan mereka dan mencantumkan izin mereka.

## Kontrol akses berbasis peran
<a name="jobruns-flink-kubernetes-security-rbac"></a>

Untuk menerapkan operator dan menjalankan pekerjaan Flink, kita harus membuat dua peran Kubernetes: satu operator dan satu peran pekerjaan. Amazon EMR membuat dua peran secara default saat Anda menginstal operator.

## Peran operator
<a name="jobruns-flink-kubernetes-security-operator-role"></a>

Kami menggunakan peran operator untuk mengelola `flinkdeployments` untuk membuat dan mengelola JobManager untuk setiap pekerjaan Flink dan sumber daya lainnya, seperti layanan.

Nama default peran operator adalah `emr-containers-sa-flink-operator` dan memerlukan izin berikut.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - events
  - configmaps
  - secrets
  - serviceaccounts
  verbs:
  - '*'
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - roles
  - rolebindings
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  - replicasets
  verbs:
  - '*'
- apiGroups:
  - extensions
  resources:
  - deployments
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - flink.apache.org
  resources:
  - flinkdeployments
  - flinkdeployments/status
  - flinksessionjobs
  - flinksessionjobs/status
  verbs:
  - '*'
- apiGroups:
  - networking.k8s.io
  resources:
  - ingresses
  verbs:
  - '*'
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - '*'
```

## Peran Job
<a name="jobruns-flink-security-job-role"></a>

 JobManager Menggunakan peran pekerjaan untuk membuat dan mengelola TaskManagers dan ConfigMaps untuk setiap pekerjaan.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - configmaps
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/finalizers
  verbs:
  - '*'
```

# Menghapus instalasi operator Flink Kubernetes untuk Amazon EMR di EKS
<a name="jobruns-flink-kubernetes-operator-uninstall"></a>

Ikuti langkah-langkah berikut untuk menghapus instalasi operator Flink Kubernetes.

1. Hapus operator.

   ```
   helm uninstall flink-kubernetes-operator -n <NAMESPACE>
   ```

1. Hapus sumber daya Kubernetes yang tidak dihapus Helm.

   ```
   kubectl delete serviceaccounts, roles, rolebindings -l emr-containers.amazonaws.com/component=flink.operator --namespace <namespace>
   kubectl delete crd flinkdeployments.flink.apache.org flinksessionjobs.flink.apache.org
   ```

1. (Opsional) Hapus cert-manager.

   ```
   kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.12.0/cert-manager.yaml
   ```

# Menggunakan Flink Native Kubernetes
<a name="jobruns-flink-native-kubernetes"></a>

Amazon EMR merilis 6.13.0 dan dukungan yang lebih tinggi Flink Native Kubernetes sebagai alat baris perintah yang dapat Anda gunakan untuk mengirimkan dan menjalankan aplikasi Flink ke EMR Amazon di klaster EKS.

**Topics**
+ [Menyiapkan Flink Native Kubernetes untuk Amazon EMR di EKS](jobruns-flink-native-kubernetes-setup.md)
+ [Memulai dengan Kubernetes asli Flink untuk Amazon EMR di EKS](jobruns-flink-native-kubernetes-getting-started.md)
+ [Persyaratan keamanan akun JobManager layanan Flink untuk Native Kubernetes](jobruns-flink-native-kubernetes-security-requirements.md)

# Menyiapkan Flink Native Kubernetes untuk Amazon EMR di EKS
<a name="jobruns-flink-native-kubernetes-setup"></a>

Selesaikan tugas-tugas berikut untuk menyiapkan sebelum Anda dapat menjalankan aplikasi dengan Flink CLI di Amazon EMR di EKS. Jika Anda sudah mendaftar untuk Amazon Web Services (AWS) dan telah menggunakan Amazon EKS, Anda hampir siap untuk menggunakan Amazon EMR di EKS. Jika Anda telah menyelesaikan salah satu prasyarat, Anda dapat melewatinya dan melanjutkan ke yang berikutnya.
+ **[Instal atau perbarui ke versi terbaru AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)** - Jika Anda sudah menginstal AWS CLI, konfirmasikan bahwa Anda memiliki versi terbaru.
+ **[Memulai Amazon EKS — eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html)** — Ikuti langkah-langkah untuk membuat cluster Kubernetes baru dengan node di Amazon EKS.
+ **[Pilih URI image dasar EMR Amazon](docker-custom-images-tag.md) (rilis 6.13.0 atau lebih tinggi)** — perintah Flink Kubernetes didukung dengan Amazon EMR rilis 6.13.0 dan yang lebih tinggi.
+ Konfirmasikan bahwa akun JobManager layanan memiliki izin yang sesuai untuk membuat dan menonton TaskManager pod. Untuk informasi selengkapnya, lihat [persyaratan keamanan akun JobManager layanan Flink untuk Kubernetes Asli](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-native-kubernetes-security-requirements.html).
+ Siapkan profil [AWS kredensial](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) lokal Anda.
+ [Buat atau perbarui file kubeconfig untuk cluster Amazon EKS](https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html) tempat Anda ingin menjalankan aplikasi Flink.

# Memulai dengan Kubernetes asli Flink untuk Amazon EMR di EKS
<a name="jobruns-flink-native-kubernetes-getting-started"></a>

Langkah-langkah ini menunjukkan cara mengonfigurasi, menyiapkan akun layanan, dan menjalankan aplikasi Flink. Flink Native Kubernetes digunakan untuk menyebarkan Flink pada klaster Kubernetes yang sedang berjalan.

## Konfigurasikan dan jalankan aplikasi Flink
<a name="jobruns-flink-native-kubernetes-getting-started-run-application"></a>

Amazon EMR 6.13.0 dan yang lebih tinggi mendukung Flink Native Kubernetes untuk menjalankan aplikasi Flink di cluster Amazon EKS. Untuk menjalankan aplikasi Flink, ikuti langkah-langkah berikut:

1. Sebelum Anda dapat menjalankan aplikasi Flink dengan perintah Flink Native Kubernetes, selesaikan langkah-langkahnya. [Menyiapkan Flink Native Kubernetes untuk Amazon EMR di EKS](jobruns-flink-native-kubernetes-setup.md)

1. [Unduh dan instal Flink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation).

1. Tetapkan nilai untuk variabel lingkungan berikut.

   ```
   #Export the FLINK_HOME environment variable to your local installation of Flink
   export FLINK_HOME=/usr/local/bin/flink #Will vary depending on your installation
   export NAMESPACE=flink
   export CLUSTER_ID=flink-application-cluster
   export IMAGE=<123456789012.dkr.ecr.sample-Wilayah AWS-.amazonaws.com/flink/emr-6.13.0-flink:latest>
   export FLINK_SERVICE_ACCOUNT=emr-containers-sa-flink
   export FLINK_CLUSTER_ROLE_BINDING=emr-containers-crb-flink
   ```

1. Buat akun layanan untuk mengelola sumber daya Kubernetes.

   ```
   kubectl create serviceaccount $FLINK_SERVICE_ACCOUNT -n $NAMESPACE
   kubectl create clusterrolebinding $FLINK_CLUSTER_ROLE_BINDING --clusterrole=edit --serviceaccount=$NAMESPACE:$FLINK_SERVICE_ACCOUNT
   ```

1. Jalankan `run-application` perintah CLI.

   ```
   $FLINK_HOME/bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.namespace=$NAMESPACE \
       -Dkubernetes.cluster-id=$CLUSTER_ID \
       -Dkubernetes.container.image.ref=$IMAGE \
       -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
       local:///opt/flink/examples/streaming/Iteration.jar
   2022-12-29 21:13:06,947 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
   2022-12-29 21:13:06,948 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
   2022-12-29 21:13:07,861 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
   2022-12-29 21:13:07,868 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://flink-application-cluster-rest.flink:8081
   ```

1. Periksa sumber daya Kubernetes yang dibuat.

   ```
   kubectl get all -n <namespace>
   NAME READY STATUS RESTARTS AGE
   pod/flink-application-cluster-546687cb47-w2p2z 1/1 Running 0 3m37s
   pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 3m24s
   
   NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
   service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 3m38s
   service/flink-application-cluster-rest ClusterIP 10.100.132.158 <none> 8081/TCP 3m38s
   
   NAME READY UP-TO-DATE AVAILABLE AGE
   deployment.apps/flink-application-cluster 1/1 1 1 3m38s
   
   NAME DESIRED CURRENT READY AGE
   replicaset.apps/flink-application-cluster-546687cb47 1 1 1 3m38s
   ```

1. Port maju ke 8081.

   ```
   kubectl port-forward service/flink-application-cluster-rest 8081 -n <namespace>
   Forwarding from 127.0.0.1:8081 -> 8081
   ```

1. Akses UI Flink secara lokal.  
![\[Akses UI Flink.\]](http://docs.aws.amazon.com/id_id/emr/latest/EMR-on-EKS-DevelopmentGuide/images/jobruns-flink-native-kubernetes-ui.png)

1. Hapus aplikasi Flink.

   ```
   kubectl delete deployment.apps/flink-application-cluster -n <namespace>
   deployment.apps "flink-application-cluster" deleted
   ```

Untuk informasi selengkapnya tentang mengirimkan aplikasi ke Flink, lihat [Native Kubernetes](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/) di dokumentasi Apache Flink.

# Persyaratan keamanan akun JobManager layanan Flink untuk Native Kubernetes
<a name="jobruns-flink-native-kubernetes-security-requirements"></a>

 JobManager Pod Flink menggunakan akun layanan Kubernetes untuk mengakses server API Kubernetes untuk membuat dan menonton pod. TaskManager Akun JobManager layanan harus memiliki izin yang sesuai untuk create/delete TaskManager pod dan memungkinkan pemimpin TaskManager to watch ConfigMaps untuk mengambil alamat JobManager dan ResourceManager di klaster Anda.

Aturan berikut berlaku untuk akun layanan ini.

```
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - "*"
- apiGroups:
  - "apps"
  resources:
  - deployments
  verbs:
  - "*"
```

# Menyesuaikan gambar Docker untuk Flink dan FluentD
<a name="jobruns-flink-docker-flink-fluentd"></a>

Ambil langkah-langkah berikut untuk menyesuaikan gambar Docker untuk Amazon EMR di EKS dengan gambar Apache Flink atau FluentD. Ini termasuk panduan teknis untuk mendapatkan gambar dasar, menyesuaikannya, menerbitkannya, dan mengirimkan beban kerja.

**Topics**
+ [Prasyarat](#jobruns-flink-docker-flink-fluentd-prereqs)
+ [Langkah 1: Ambil gambar dasar dari Amazon Elastic Container Registry](#jobruns-flink-docker-flink-fluentd-retrieve-base)
+ [Langkah 2: Sesuaikan gambar dasar](#jobruns-flink-docker-flink-fluentd-customize-image)
+ [Langkah 3: Publikasikan gambar kustom Anda](#jobruns-flink-docker-flink-fluentd-publish-image)
+ [Langkah 4: Kirim beban kerja Flink di Amazon EMR menggunakan gambar khusus](#jobruns-flink-docker-flink-fluentd-submit-workload)

## Prasyarat
<a name="jobruns-flink-docker-flink-fluentd-prereqs"></a>

Sebelum Anda menyesuaikan image Docker Anda, pastikan bahwa Anda telah menyelesaikan prasyarat berikut:
+ Menyelesaikan [Menyiapkan operator Flink Kubernetes untuk Amazon EMR](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator-setup.html) pada langkah-langkah EKS.
+ Menginstal Docker di lingkungan Anda. Untuk informasi lebih lanjut, lihat [Get Docker](https://docs.docker.com/get-docker/).

## Langkah 1: Ambil gambar dasar dari Amazon Elastic Container Registry
<a name="jobruns-flink-docker-flink-fluentd-retrieve-base"></a>

Gambar dasar berisi runtime Amazon EMR dan konektor yang perlu Anda akses lainnya. Layanan AWS Jika Anda menggunakan Amazon EMR di EKS dengan Flink versi 6.14.0 atau lebih tinggi, Anda bisa mendapatkan gambar dasar dari Galeri Publik Amazon ECR. Jelajahi galeri untuk menemukan tautan gambar dan tarik gambar ke ruang kerja lokal Anda. Misalnya, untuk rilis Amazon EMR 6.14.0, `docker pull` perintah berikut mengembalikan gambar dasar standar terbaru. Ganti `emr-6.14.0:latest` dengan versi rilis yang Anda inginkan.

```
docker pull public.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink:latest
```

Berikut ini adalah tautan ke gambar galeri Flink dan gambar galeri Fluentd:
+ [emr-on-eks/flink/emr-6.14.0-flink](https://gallery.ecr.aws/emr-on-eks/flink/emr-6.14.0-flink)
+ [emr-on-eks/fasih/emr-6.14.0 (](https://gallery.ecr.aws/emr-on-eks/fluentd/emr-6.14.0)

## Langkah 2: Sesuaikan gambar dasar
<a name="jobruns-flink-docker-flink-fluentd-customize-image"></a>

Langkah-langkah berikut menjelaskan cara menyesuaikan gambar dasar yang Anda tarik dari Amazon ECR.

1. Buat `Dockerfile` baru di Workspace lokal Anda.

1. Edit `Dockerfile` dan tambahkan konten berikut. Ini `Dockerfile` menggunakan gambar kontainer yang Anda tarik`public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest`.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

   Gunakan konfigurasi berikut jika Anda menggunakan`Fluentd`.

   ```
   FROM public.ecr.aws/emr-on-eks/fluentd/emr-7.12.0:latest
   USER root
   ### Add customization commands here ####
   USER hadoop:hadoop
   ```

1. Tambahkan perintah di `Dockerfile` untuk menyesuaikan gambar dasar. Perintah berikut menunjukkan cara menginstal pustaka Python.

   ```
   FROM public.ecr.aws/emr-on-eks/flink/emr-7.12.0-flink:latest
   USER root
   RUN pip3 install --upgrade boto3 pandas numpy // For python 3
   USER hadoop:hadoop
   ```

1. Di direktori yang sama dengan tempat Anda membuat`DockerFile`, jalankan perintah berikut untuk membangun image Docker. Bidang yang Anda berikan mengikuti `-t` bendera adalah nama kustom Anda untuk gambar.

   ```
   docker build -t <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Langkah 3: Publikasikan gambar kustom Anda
<a name="jobruns-flink-docker-flink-fluentd-publish-image"></a>

Anda sekarang dapat mempublikasikan gambar Docker baru ke registri Amazon ECR Anda.

1. Jalankan perintah berikut untuk membuat repositori Amazon ECR untuk menyimpan image Docker Anda. Berikan nama untuk repositori Anda, seperti `emr_custom_repo.` Untuk informasi selengkapnya, lihat [Membuat repositori di Panduan Pengguna](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository) Amazon Elastic Container Registry.

   ```
   aws ecr create-repository \
          --repository-name emr_custom_repo \
          --image-scanning-configuration scanOnPush=true \
          --region <AWS_REGION>
   ```

1. Jalankan perintah berikut untuk mengautentikasi ke registri default Anda. Untuk informasi selengkapnya, lihat [Mengautentikasi ke registri default Anda](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry) di Panduan Pengguna Amazon Elastic Container Registry.

   ```
   aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com
   ```

1. Tekan gambar. Untuk informasi selengkapnya, lihat [Mendorong gambar ke Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-push-image) di Panduan Pengguna Amazon Elastic Container Registry.

   ```
   docker push <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
   ```

## Langkah 4: Kirim beban kerja Flink di Amazon EMR menggunakan gambar khusus
<a name="jobruns-flink-docker-flink-fluentd-submit-workload"></a>

Buat perubahan berikut pada `FlinkDeployment` spesifikasi Anda untuk menggunakan gambar kustom. Untuk melakukannya, masukkan gambar Anda sendiri di `spec.image` baris spesifikasi penerapan Anda.

```
apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     flinkVersion: v1_18
     image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
     imagePullPolicy: Always
     flinkConfiguration:
           taskmanager.numberOfTaskSlots: "1"
```

Untuk menggunakan gambar kustom untuk pekerjaan Fluentd Anda, masukkan gambar Anda sendiri di `monitoringConfiguration.image` baris spesifikasi penerapan Anda.

```
  monitoringConfiguration:
       image: <YOUR_ACCOUNT_ID>.dkr.ecr.<YOUR_ECR_REGION>.amazonaws.com/<ECR_REPO>:<ECR_TAG>
       cloudWatchMonitoringConfiguration:
         logGroupName: flink-log-group
         logStreamNamePrefix: custom-fluentd
```

# Memantau pekerjaan operator Flink Kubernetes dan Flink
<a name="jobruns-flink-monitoring"></a>

Bagian ini menjelaskan beberapa cara agar Anda dapat memantau pekerjaan Flink Anda dengan Amazon EMR di EKS. Ini termasuk mengintegrasikan Flink dengan Amazon Managed Service untuk Prometheus, menggunakan *Dasbor Web Flink*, yang menyediakan status pekerjaan dan metrik, atau menggunakan konfigurasi pemantauan untuk mengirim data log ke Amazon S3 dan. Amazon CloudWatch

**Topics**
+ [Menggunakan Amazon Managed Service untuk Prometheus untuk memantau pekerjaan Flink](jobruns-flink-monitoring-prometheus.md)
+ [Gunakan UI Flink untuk memantau pekerjaan Flink](jobruns-flink-monitoring-ui.md)
+ [Gunakan konfigurasi pemantauan untuk memantau pekerjaan operator Flink Kubernetes dan Flink](jobruns-flink-monitoring-configuration.md)

# Menggunakan Amazon Managed Service untuk Prometheus untuk memantau pekerjaan Flink
<a name="jobruns-flink-monitoring-prometheus"></a>

Anda dapat mengintegrasikan Apache Flink dengan Amazon Managed Service untuk Prometheus (portal manajemen). Layanan Terkelola Amazon untuk Prometheus mendukung pengambilan metrik dari Amazon Managed Service untuk server Prometheus dalam cluster yang berjalan di Amazon EKS. Layanan Terkelola Amazon untuk Prometheus bekerja sama dengan server Prometheus yang sudah berjalan di cluster Amazon EKS Anda. Menjalankan Amazon Managed Service untuk integrasi Prometheus dengan operator Amazon EMR Flink akan secara otomatis menerapkan dan mengonfigurasi server Prometheus untuk diintegrasikan dengan Amazon Managed Service untuk Prometheus.

1. [Buat Layanan Terkelola Amazon untuk Prometheus](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html) Workspace. Ruang kerja ini berfungsi sebagai titik akhir konsumsi. Anda akan memerlukan URL tulis jarak jauh nanti.

1. Siapkan peran IAM untuk akun layanan.

   Untuk metode orientasi ini, gunakan peran IAM untuk akun layanan di klaster Amazon EKS tempat server Prometheus berjalan. Peran ini juga disebut *peran layanan*.

   Jika Anda belum memiliki peran, [siapkan peran layanan untuk menelan metrik dari kluster Amazon EKS](https://docs.aws.amazon.com/prometheus/latest/userguide/set-up-irsa.html).

   Sebelum Anda melanjutkan, buat peran IAM yang disebut`amp-iamproxy-ingest-role`.

1. Instal Operator Flink EMR Amazon dengan Amazon Managed Service untuk Prometheus.

Sekarang setelah Anda memiliki Layanan Terkelola Amazon untuk ruang kerja Prometheus, peran IAM khusus untuk Layanan Terkelola Amazon untuk Prometheus, dan izin yang diperlukan, Anda dapat menginstal operator Amazon EMR Flink.

Buat file `enable-amp.yaml`. File ini memungkinkan Anda menggunakan konfigurasi khusus untuk mengganti Layanan Terkelola Amazon untuk pengaturan Prometheus. Pastikan untuk menggunakan peran Anda sendiri.

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

Gunakan [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/)perintah untuk meneruskan penggantian ke bagan. `flink-kubernetes-operator`

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

Perintah ini secara otomatis menginstal reporter Prometheus di operator pada port 9999. Any future `FlinkDeployment` juga mengekspos `metrics` port di 9249.
+ Metrik operator Flink muncul di Prometheus di bawah label. `flink_k8soperator_`
+ Metrik Flink Task Manager muncul di Prometheus di bawah label. `flink_taskmanager_`
+ Metrik Manajer Job Flink muncul di Prometheus di bawah label. `flink_jobmanager_`

# Gunakan UI Flink untuk memantau pekerjaan Flink
<a name="jobruns-flink-monitoring-ui"></a>

Untuk memantau kesehatan dan kinerja aplikasi Flink yang sedang berjalan, gunakan *Flink Web* Dashboard. Dasbor ini memberikan informasi tentang status pekerjaan, jumlah TaskManagers, dan metrik serta log untuk pekerjaan itu. Ini juga memungkinkan Anda melihat dan memodifikasi konfigurasi pekerjaan Flink, dan berinteraksi dengan cluster Flink untuk mengirimkan atau membatalkan pekerjaan.

Untuk mengakses Flink Web Dashboard untuk aplikasi Flink yang sedang berjalan di Kubernetes:

1. Gunakan `kubectl port-forward` perintah untuk meneruskan port lokal ke port tempat Flink Web Dashboard berjalan di pod aplikasi TaskManager Flink. Secara default, port ini adalah 8081. Ganti *deployment-name* dengan nama penerapan aplikasi Flink dari atas.

   ```
   kubectl get deployments -n namespace
   ```

   Contoh output:

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. Jika Anda ingin menggunakan port yang berbeda secara lokal, gunakan *local-port* parameter:8081.

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. Di browser web, navigasikan ke `http://localhost:8081` (atau `http://localhost:local-port` jika Anda menggunakan port lokal khusus) untuk mengakses Dasbor Web Flink. Dasbor ini menampilkan informasi tentang aplikasi Flink yang sedang berjalan, seperti status pekerjaan, jumlah TaskManagers, dan metrik serta log untuk pekerjaan tersebut.  
![\[Contoh UI Dasbor Flink\]](http://docs.aws.amazon.com/id_id/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# Gunakan konfigurasi pemantauan untuk memantau pekerjaan operator Flink Kubernetes dan Flink
<a name="jobruns-flink-monitoring-configuration"></a>

Konfigurasi pemantauan memungkinkan Anda dengan mudah mengatur pengarsipan log aplikasi Flink dan log operator Anda ke S3 and/or CloudWatch (Anda dapat memilih salah satu atau keduanya). Melakukan hal itu menambahkan sidecar FluentD ke JobManager Anda TaskManager dan pod dan selanjutnya meneruskan log komponen ini ke sink yang dikonfigurasi.

**catatan**  
Anda harus mengatur Peran IAM untuk akun layanan untuk operator Flink Anda dan pekerjaan Flink Anda (Akun Layanan) untuk dapat menggunakan fitur ini, karena memerlukan interaksi dengan yang lain. Layanan AWS Anda harus mengatur ini menggunakan IRSA di[Menyiapkan operator Flink Kubernetes untuk Amazon EMR di EKS](jobruns-flink-kubernetes-operator-setup.md).

## Log aplikasi Flink
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

Anda dapat menentukan konfigurasi ini dengan cara berikut.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

Berikut ini adalah opsi konfigurasi.
+ `s3MonitoringConfiguration`— Kunci konfigurasi untuk mengatur penerusan ke S3
  + `logUri`(wajib) — jalur bucket S3 tempat Anda ingin menyimpan log Anda.
  + Jalur di S3 setelah log diunggah akan terlihat seperti berikut.
    + Tidak ada rotasi log yang diaktifkan:

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + Rotasi log diaktifkan. Anda dapat menggunakan file yang diputar dan file saat ini (satu tanpa cap tanggal).

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      Format berikut adalah angka yang bertambah.

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + Izin IAM berikut diperlukan untuk menggunakan forwarder ini.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration`— kunci konfigurasi untuk mengatur penerusan ke. CloudWatch
  + `logGroupName`(wajib) — nameof grup CloudWatch log yang ingin Anda kirimi log (secara otomatis membuat grup jika tidak ada).
  + `logStreamNamePrefix`(opsional) — nama aliran log yang ingin Anda kirim log ke. Nilai default adalah string kosong. Formatnya adalah sebagai berikut:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + Izin IAM berikut diperlukan untuk menggunakan forwarder ini.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources`(opsional) — kunci konfigurasi untuk menetapkan batas sumber daya pada wadah sidecar Fluentbit yang diluncurkan.
  + `memoryLimit`(opsional) - nilai default adalah 512Mi. Sesuaikan sesuai dengan kebutuhan Anda.
  + `cpuLimit`(opsional) — opsi ini tidak memiliki default. Sesuaikan sesuai dengan kebutuhan Anda.
+ `containerLogRotationConfiguration`(opsional) — mengontrol perilaku rotasi log kontainer. Agen tidak diaktifkan secara default.
  + `rotationSize`(wajib) - menentukan ukuran file untuk rotasi log. Kisaran nilai yang mungkin adalah dari 2KB hingga 2GB. Bagian unit numerik dari parameter RotationSize dilewatkan sebagai bilangan bulat. Karena nilai desimal tidak didukung, Anda dapat menentukan ukuran rotasi 1,5GB, misalnya, dengan nilai 1500MB. Defaultnya adalah 2GB.
  + `maxFilesToKeep`(wajib) — menentukan jumlah maksimum file untuk disimpan dalam wadah setelah rotasi telah terjadi. Nilai minimum adalah 1, dan nilai maksimum adalah 50. Default-nya adalah 10.

## Log operator Flink
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

Kami juga dapat mengaktifkan pengarsipan log untuk operator dengan menggunakan opsi berikut dalam `values.yaml` file di instalasi bagan helm Anda. Anda dapat mengaktifkan S3, CloudWatch, atau keduanya.

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

Berikut ini adalah opsi konfigurasi yang tersedia di bawah`monitoringConfiguration`.
+ `s3MonitoringConfiguration`— atur opsi ini untuk mengarsipkan ke S3.
+ `logUri`(wajib) - Jalur bucket S3 tempat Anda ingin menyimpan log Anda.
+ Berikut ini adalah format seperti apa jalur bucket S3 setelah log diunggah.
  + Tidak ada rotasi log yang diaktifkan.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + Rotasi log diaktifkan. Anda dapat menggunakan file yang diputar dan file saat ini (satu tanpa cap tanggal).

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    Indeks format berikut adalah angka yang bertambah.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration`— kunci konfigurasi untuk mengatur penerusan ke. CloudWatch
  + `logGroupName`(wajib) — nama grup CloudWatch log yang ingin Anda kirimi log. Grup secara otomatis akan dibuat jika tidak ada.
  + `logStreamNamePrefix`(opsional) — nama aliran log yang ingin Anda kirim log ke. Nilai default adalah string kosong. Formatnya CloudWatch adalah sebagai berikut:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources`(opsional) — kunci konfigurasi untuk menetapkan batas sumber daya pada wadah sidecar Fluentbit yang diluncurkan.
  + `memoryLimit`(opsional) — batas memori. Sesuaikan sesuai dengan kebutuhan Anda. Defaultnya adalah 512Mi.
  + `cpuLimit`— batas CPU. Sesuaikan sesuai dengan kebutuhan Anda. Tidak ada nilai default.
+ `containerLogRotationConfiguration`(opsional): — mengontrol perilaku rotasi log kontainer. Agen tidak diaktifkan secara default.
  + `rotationSize`(wajib) - menentukan ukuran file untuk rotasi log. Kisaran nilai yang mungkin adalah dari 2KB hingga 2GB. Bagian unit numerik dari parameter RotationSize dilewatkan sebagai bilangan bulat. Karena nilai desimal tidak didukung, Anda dapat menentukan ukuran rotasi 1,5GB, misalnya, dengan nilai 1500MB. Defaultnya adalah 2GB.
  + `maxFilesToKeep`(wajib) — menentukan jumlah maksimum file untuk disimpan dalam wadah setelah rotasi telah terjadi. Nilai minimum adalah 1, dan nilai maksimum adalah 50. Default-nya adalah 10.

# Bagaimana Flink mendukung ketersediaan tinggi dan ketahanan kerja
<a name="jobruns-flink-resiliency"></a>

Bagian berikut menguraikan bagaimana Flink membuat pekerjaan lebih andal dan sangat tersedia. Ia melakukan ini melalui kemampuan bawaan seperti ketersediaan tinggi Flink dan berbagai kemampuan pemulihan jika terjadi kegagalan.

**Topics**
+ [Menggunakan ketersediaan tinggi (HA) untuk Operator Flink dan Aplikasi Flink](jobruns-flink-using-ha.md)
+ [Mengoptimalkan waktu restart pekerjaan Flink untuk pemulihan tugas dan operasi penskalaan dengan Amazon EMR di EKS](jobruns-flink-restart.md)
+ [Penonaktifan Instans Spot yang anggun dengan Flink di Amazon EMR di EKS](jobruns-flink-decommission.md)

# Menggunakan ketersediaan tinggi (HA) untuk Operator Flink dan Aplikasi Flink
<a name="jobruns-flink-using-ha"></a>

Topik ini menunjukkan cara mengonfigurasi ketersediaan tinggi dan menjelaskan cara kerjanya untuk beberapa kasus penggunaan yang berbeda. Ini termasuk ketika Anda menggunakan Job manager dan ketika Anda menggunakan kubernetes asli Flink.

## Operator Flink ketersediaan tinggi
<a name="jobruns-flink-ha-operator"></a>

Kami mengaktifkan *ketersediaan tinggi* untuk Operator Flink sehingga kami dapat gagal ke Operator Flink siaga untuk meminimalkan waktu henti di loop kontrol operator jika terjadi kegagalan. Ketersediaan tinggi diaktifkan secara default dan jumlah default replika operator awal adalah 2. Anda dapat mengonfigurasi bidang replika di `values.yaml` file Anda untuk bagan helm.

Bidang berikut dapat disesuaikan:
+ `replicas`(opsional, defaultnya adalah 2): Menyetel nomor ini menjadi lebih besar dari 1 membuat Operator siaga lainnya dan memungkinkan pemulihan pekerjaan Anda lebih cepat.
+ `highAvailabilityEnabled`(opsional, defaultnya benar): Mengontrol apakah Anda ingin mengaktifkan HA. Menentukan parameter ini sebagai true memungkinkan dukungan penyebaran multi AZ, serta menetapkan parameter yang benar`flink-conf.yaml`.

Anda dapat menonaktifkan HA untuk operator Anda dengan mengatur konfigurasi berikut di `values.yaml` file Anda.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Penyebaran multi AZ**

Kami membuat pod operator di beberapa Availability Zone. Ini adalah kendala lunak, dan pod operator Anda akan dijadwalkan di AZ yang sama jika Anda tidak memiliki cukup sumber daya di AZ yang berbeda.

**Menentukan replika pemimpin**

 Jika HA diaktifkan, replika menggunakan sewa untuk menentukan pemimpin mana dan menggunakan Sewa K8 untuk pemilihan pemimpin. JMs Anda dapat menjelaskan Sewa dan melihat bidang Identitas .Spec.Holder untuk menentukan pemimpin saat ini

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interaksi Flink-S3**

**Mengkonfigurasi kredenal akses**

Pastikan Anda telah mengonfigurasi IRSA dengan izin IAM yang sesuai untuk mengakses bucket S3.

**Mengambil stoples pekerjaan dari mode Aplikasi S3**

Operator Flink juga mendukung pengambilan stoples aplikasi dari S3. Anda hanya menyediakan lokasi S3 untuk JarUri dalam spesifikasi Anda FlinkDeployment .

Anda juga dapat menggunakan fitur ini untuk mengunduh artefak lain seperti PyFlink skrip. Skrip Python yang dihasilkan dijatuhkan di bawah jalur. `/opt/flink/usrlib/`

Contoh berikut menunjukkan bagaimana menggunakan fitur ini untuk PyFlink pekerjaan. Perhatikan bidang JarUri dan args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Konektor Flink S3**

Flink dikemas dengan dua konektor S3 (tercantum di bawah). Bagian berikut membahas kapan harus menggunakan konektor mana.

**Checkpointing: Konektor Presto S3**
+ Setel skema S3 ke s3p://
+ Konektor yang disarankan untuk digunakan ke pos pemeriksaan ke s3. Untuk informasi selengkapnya, lihat [khusus S3 dalam dokumentasi](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) Apache Flink.

Contoh FlinkDeployment spesifikasi:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Membaca dan menulis ke S3: konektor Hadoop S3**
+ Atur skema S3 ke `s3://` atau () `s3a://`
+ Konektor yang direkomendasikan untuk membaca dan menulis file dari S3 (hanya konektor S3 yang mengimplementasikan antarmuka [Flinks](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/) Filesystem).
+ Secara default, kami mengatur `fs.s3a.aws.credentials.provider` dalam `flink-conf.yaml` file, yaitu`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Jika Anda mengganti d efault `flink-conf` sepenuhnya dan Anda berinteraksi dengan S3, pastikan untuk menggunakan penyedia ini.

Contoh FlinkDeployment spesifikasi

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Manajer Pekerjaan Flink
<a name="jobruns-flink-ha-manager"></a>

Ketersediaan Tinggi (HA) untuk Penerapan Flink memungkinkan pekerjaan terus membuat kemajuan bahkan jika kesalahan sementara ditemukan dan crash Anda. JobManager Pekerjaan akan dimulai ulang tetapi dari pos pemeriksaan terakhir yang berhasil dengan HA diaktifkan. Tanpa HA diaktifkan, Kubernetes akan memulai ulang pekerjaan Anda JobManager, tetapi pekerjaan Anda akan dimulai sebagai pekerjaan baru dan akan kehilangan kemajuannya. Setelah mengonfigurasi HA, kami dapat memberi tahu Kubernetes untuk menyimpan metadata HA dalam penyimpanan persisten untuk referensi jika terjadi kegagalan sementara di JobManager dan kemudian melanjutkan pekerjaan kami dari pos pemeriksaan terakhir yang berhasil.

HA diaktifkan secara default untuk pekerjaan Flink Anda (jumlah replika disetel ke 2, yang mengharuskan Anda menyediakan lokasi penyimpanan S3 agar metadata HA tetap ada).

**Konfigurasi HA**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Berikut ini adalah deskripsi untuk konfigurasi HA di atas di Job Manager (didefinisikan di bawah.spec.jobManager):
+ `highAvailabilityEnabled`(opsional, default adalah true): Setel ini ke `false ` jika Anda tidak ingin HA diaktifkan dan tidak ingin menggunakan konfigurasi HA yang disediakan. Anda masih dapat memanipulasi bidang “replika” untuk mengonfigurasi HA secara manual.
+ `replicas`(opsional, defaultnya adalah 2): Menyetel nomor ini menjadi lebih besar dari 1 membuat siaga lainnya JobManagers dan memungkinkan pemulihan pekerjaan Anda lebih cepat. Jika Anda menonaktifkan HA, Anda harus mengatur jumlah replika ke 1, atau Anda akan terus mendapatkan kesalahan validasi (hanya 1 replika yang didukung jika HA tidak diaktifkan).
+ `storageDir`(required): Karena kami menggunakan jumlah replika sebagai 2 secara default, kami harus menyediakan StorageDir persisten. Saat ini bidang ini hanya menerima jalur S3 sebagai lokasi penyimpanan.

**Lokalitas pod**

 Jika Anda mengaktifkan HA, kami juga mencoba mengkolokasi pod di AZ yang sama, yang mengarah pada peningkatan kinerja (mengurangi latensi jaringan dengan memiliki pod yang sama AZs). Ini adalah proses upaya terbaik, artinya jika Anda tidak memiliki cukup sumber daya di AZ di mana sebagian besar Pod Anda dijadwalkan, Pod yang tersisa masih akan dijadwalkan tetapi mungkin berakhir pada node di luar AZ ini.

**Menentukan replika pemimpin**

Jika HA diaktifkan, replika menggunakan sewa untuk menentukan pemimpin mana dan menggunakan Configmap K8s sebagai datastore untuk menyimpan metadata ini. JMs Jika Anda ingin menentukan pemimpin, Anda dapat melihat konten Configmap dan melihat kunci di `org.apache.flink.k8s.leader.restserver` bawah data untuk menemukan pod K8s dengan alamat IP. Anda juga dapat menggunakan perintah bash berikut.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Pekerjaan Flink - Kubernetes asli
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 dan yang lebih tinggi mendukung Kubernetes asli Flink untuk menjalankan aplikasi Flink dalam mode ketersediaan tinggi pada cluster Amazon EKS. 

**catatan**  
Anda harus memiliki bucket Amazon S3 yang dibuat untuk menyimpan metadata ketersediaan tinggi saat mengirimkan pekerjaan Flink Anda. Jika Anda tidak ingin menggunakan fitur ini, Anda dapat menonaktifkannya. Ini diaktifkan secara default.

Untuk mengaktifkan fitur ketersediaan tinggi Flink, berikan parameter Flink berikut saat Anda menjalankan perintah [CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Parameter didefinisikan di bawah contoh.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**- Bucket Amazon S3 tempat Anda ingin menyimpan metadata ketersediaan tinggi untuk pekerjaan Anda.

  **`Dkubernetes.jobmanager.replicas`**— Jumlah pod Job Manager yang akan dibuat sebagai bilangan bulat lebih besar dari`1`.

  **`Dkubernetes.cluster-id`**— ID unik yang mengidentifikasi cluster Flink.

# Mengoptimalkan waktu restart pekerjaan Flink untuk pemulihan tugas dan operasi penskalaan dengan Amazon EMR di EKS
<a name="jobruns-flink-restart"></a>

Ketika tugas gagal atau ketika operasi penskalaan terjadi, Flink mencoba untuk mengeksekusi kembali tugas dari pos pemeriksaan terakhir selesai. Proses restart bisa memakan waktu satu menit atau lebih lama untuk dijalankan, tergantung pada ukuran status pos pemeriksaan dan jumlah tugas paralel. Selama periode restart, tugas backlog dapat menumpuk untuk pekerjaan itu. Ada beberapa cara, bahwa Flink mengoptimalkan kecepatan pemulihan dan memulai ulang grafik eksekusi untuk meningkatkan stabilitas pekerjaan.

Halaman ini menjelaskan beberapa cara Amazon EMR Flink dapat meningkatkan waktu restart pekerjaan selama pemulihan tugas atau operasi penskalaan pada instance spot. Instans spot adalah kapasitas komputasi yang tidak terpakai yang tersedia dengan diskon. Ini memiliki perilaku unik, termasuk gangguan sesekali, jadi penting untuk memahami bagaimana Amazon EMR di EKS menangani ini, termasuk bagaimana Amazon EMR di EKS melakukan penonaktifan dan memulai kembali pekerjaan.

**Topics**
+ [Pemulihan tugas-lokal](#flink-restart-task-local)
+ [Pemulihan tugas-lokal dengan pemasangan volume Amazon EBS](#flink-restart-task-local-ebs)
+ [Pos pemeriksaan inkremental berbasis log generik](#flink-restart-log-check)
+ [Pemulihan berbutir halus](#flink-restart-fine-grained)
+ [Mekanisme restart gabungan dalam penjadwal adaptif](#flink-restart-combined)

## Pemulihan tugas-lokal
<a name="flink-restart-task-local"></a>

**catatan**  
Pemulihan tugas-lokal didukung dengan Flink di Amazon EMR di EKS 6.14.0 dan lebih tinggi.

Dengan pos pemeriksaan Flink, setiap tugas menghasilkan snapshot statusnya yang ditulis Flink ke penyimpanan terdistribusi seperti Amazon S3. Dalam kasus pemulihan, tugas mengembalikan keadaan mereka dari penyimpanan terdistribusi. Penyimpanan terdistribusi memberikan toleransi kesalahan dan dapat mendistribusikan kembali status selama penskalaan ulang karena dapat diakses oleh semua node.

Namun, toko terdistribusi jarak jauh juga memiliki kelemahan: semua tugas harus membaca statusnya dari lokasi terpencil melalui jaringan. Hal ini dapat mengakibatkan waktu pemulihan yang lama untuk negara bagian besar selama pemulihan tugas atau operasi penskalaan.

Masalah waktu pemulihan yang lama ini diselesaikan dengan pemulihan *tugas-lokal*. Tugas menulis status mereka di pos pemeriksaan ke dalam penyimpanan sekunder yang bersifat lokal untuk tugas, seperti pada disk lokal. Mereka juga menyimpan status mereka di penyimpanan utama, atau Amazon S3 dalam kasus kami. Selama pemulihan, penjadwal menjadwalkan tugas pada Task Manager yang sama di mana tugas berjalan lebih awal sehingga mereka dapat pulih dari penyimpanan status lokal alih-alih membaca dari penyimpanan status jarak jauh. Untuk informasi selengkapnya, lihat [Task-Local Recovery di Dokumentasi](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) *Apache Flink*.

Tes benchmark kami dengan pekerjaan sampel telah menunjukkan bahwa waktu pemulihan telah dikurangi dari menit menjadi beberapa detik dengan pemulihan tugas-lokal diaktifkan.

Untuk mengaktifkan pemulihan tugas-lokal, atur konfigurasi berikut di file Anda. `flink-conf.yaml` Tentukan nilai interval checkpointing dalam milidetik.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint
    execution.checkpointing.interval: 15000
```

## Pemulihan tugas-lokal dengan pemasangan volume Amazon EBS
<a name="flink-restart-task-local-ebs"></a>

**catatan**  
Pemulihan tugas-lokal oleh Amazon EBS didukung dengan Flink di Amazon EMR di EKS 6.15.0 dan lebih tinggi.

Dengan Flink di Amazon EMR di EKS, Anda dapat secara otomatis menyediakan volume Amazon EBS ke pod untuk pemulihan TaskManager tugas lokal. Mount overlay default dilengkapi dengan volume 10 GB, yang cukup untuk pekerjaan dengan status lebih rendah. Pekerjaan dengan status besar dapat mengaktifkan opsi *pemasangan volume EBS otomatis*. TaskManagerPod secara otomatis dibuat dan dipasang selama pembuatan pod dan dihapus selama penghapusan pod.

Gunakan langkah-langkah berikut untuk mengaktifkan pemasangan volume EBS otomatis untuk Flink di Amazon EMR di EKS:

1. Ekspor nilai untuk variabel berikut yang akan Anda gunakan dalam langkah mendatang.

   ```
   export AWS_REGION=aa-example-1 
   export FLINK_EKS_CLUSTER_NAME=my-cluster
   export AWS_ACCOUNT_ID=111122223333
   ```

1. Buat atau perbarui file `kubeconfig` YAMM untuk klaster Anda.

   ```
   aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
   ```

1. Buat akun layanan IAM untuk driver Amazon EBS Container Storage Interface (CSI) di cluster Amazon EKS Anda. 

   ```
   eksctl create iamserviceaccount \
      --name ebs-csi-controller-sa \
      --namespace kube-system \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME\
      --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \
      --role-only \
      --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
      --approve
   ```

1. Buat driver Amazon EBS CSI dengan perintah berikut:

   ```
   eksctl create addon \
      --name aws-ebs-csi-driver \
      --region $AWS_REGION \
      --cluster $FLINK_EKS_CLUSTER_NAME \
      --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
   ```

1. Buat kelas penyimpanan Amazon EBS dengan perintah berikut:

   ```
   cat ≪ EOF ≫ storage-class.yaml
   apiVersion: storage.k8s.io/v1
   kind: StorageClass
   metadata:
     name: ebs-sc
   provisioner: ebs.csi.aws.com
   volumeBindingMode: WaitForFirstConsumer
   EOF
   ```

   Dan kemudian terapkan kelas:

   ```
   kubectl apply -f storage-class.yaml
   ```

1. Helm instal operator Amazon EMR Flink Kubernetes dengan opsi untuk membuat akun layanan. Ini menciptakan `emr-containers-sa-flink` untuk digunakan dalam penerapan Flink.

   ```
   helm install flink-kubernetes-operator flink-kubernetes-operator/ \
      --set jobServiceAccount.create=true \
      --set rbac.jobRole.create=true \
      --set rbac.jobRoleBinding.create=true
   ```

1. Untuk mengirimkan pekerjaan Flink dan mengaktifkan penyediaan otomatis volume EBS untuk pemulihan tugas-lokal, atur konfigurasi berikut di file Anda. `flink-conf.yaml` Sesuaikan batas ukuran untuk ukuran status pekerjaan. Atur `serviceAccount` ke `emr-containers-sa-flink`. Tentukan nilai interval checkpointing dalam milidetik. Dan hilangkan. `executionRoleArn`

   ```
   flinkConfiguration:
       task.local-recovery.ebs.enable: true
       kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi
       state.checkpoints.dir: s3://BUCKET-PATH/checkpoint
       state.backend.local-recovery: true
       state.backend: hasmap or rocksdb
       state.backend.incremental: "true"
       execution.checkpointing.interval: 15000
     serviceAccount: emr-containers-sa-flink
   ```

Saat Anda siap untuk menghapus plugin driver Amazon EBS CSI, gunakan perintah berikut:

```
  # Detach Attached Policy
  aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
  # Delete the created Role
  aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  # Delete the created service account
  eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete Addon
  eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  # Delete the EBS storage class
  kubectl delete -f storage-class.yaml
```

## Pos pemeriksaan inkremental berbasis log generik
<a name="flink-restart-log-check"></a>

**catatan**  
Pemeriksaan inkremental berbasis log generik didukung dengan Flink di Amazon EMR di EKS 6.14.0 dan yang lebih tinggi.

Checkpointing inkremental berbasis log generik ditambahkan di Flink 1.16 untuk meningkatkan kecepatan pos pemeriksaan. Interval pos pemeriksaan yang lebih cepat sering mengakibatkan pengurangan pekerjaan pemulihan karena lebih sedikit peristiwa yang perlu diproses ulang setelah pemulihan. *Untuk informasi selengkapnya, lihat [Meningkatkan kecepatan dan stabilitas pos pemeriksaan dengan pos pemeriksaan inkremental berbasis log generik di Blog Apache Flink](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/).*

Dengan pekerjaan sampel, tes benchmark kami telah menunjukkan bahwa waktu pos pemeriksaan berkurang dari menit menjadi beberapa detik dengan pos pemeriksaan inkremental berbasis log generik.

Untuk mengaktifkan pos pemeriksaan inkremental berbasis log generik, atur konfigurasi berikut di file Anda. `flink-conf.yaml` Tentukan nilai interval checkpointing dalam milidetik.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Pemulihan berbutir halus
<a name="flink-restart-fine-grained"></a>

**catatan**  
Dukungan pemulihan berbutir halus untuk penjadwal default didukung dengan Flink di Amazon EMR di EKS 6.14.0 dan yang lebih tinggi. Dukungan pemulihan berbutir halus dalam penjadwal adaptif tersedia dengan Flink di Amazon EMR di EKS 6.15.0 dan lebih tinggi.

Ketika tugas gagal selama eksekusi, Flink mengatur ulang seluruh grafik eksekusi dan memicu eksekusi ulang lengkap dari pos pemeriksaan terakhir yang diselesaikan. Ini lebih mahal daripada hanya menjalankan kembali tugas yang gagal. Pemulihan berbutir halus hanya memulai kembali komponen yang terhubung dengan pipa dari tugas yang gagal. Dalam contoh berikut, grafik pekerjaan memiliki 5 simpul (`A`ke`E`). Semua koneksi antara simpul disalurkan dengan distribusi pointwise, dan `parallelism.default` untuk pekerjaan diatur ke. `2` 

```
A → B → C → D → E
```

Untuk contoh ini, ada total 10 tugas yang berjalan. Pipeline pertama (`a1`to`e1`) berjalan pada a TaskManager (`TM1`), dan pipeline kedua (`a2`to`e2`) berjalan pada yang lain TaskManager (`TM2`).

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Ada dua komponen yang terhubung dengan pipa:`a1 → e1`, dan`a2 → e2`. Jika salah satu `TM1` atau `TM2` gagal, kegagalan hanya berdampak pada 5 tugas dalam pipeline tempat TaskManager sedang berjalan. Strategi restart hanya memulai komponen pipelined yang terpengaruh. 

Pemulihan berbutir halus hanya berfungsi dengan pekerjaan Flink paralel sempurna. Ini tidak didukung dengan `keyBy()` atau `redistribute()` operasi. Untuk informasi selengkapnya, lihat [FLIP-1: Pemulihan Berbutir Halus dari Kegagalan Tugas](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) dalam proyek Jira Proposal *Peningkatan Flink*.

Untuk mengaktifkan pemulihan berbutir halus, atur konfigurasi berikut di file Anda. `flink-conf.yaml`

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Mekanisme restart gabungan dalam penjadwal adaptif
<a name="flink-restart-combined"></a>

**catatan**  
Mekanisme restart gabungan dalam penjadwal adaptif didukung dengan Flink di Amazon EMR di EKS 6.15.0 dan lebih tinggi.

Penjadwal adaptif dapat menyesuaikan paralelisme pekerjaan berdasarkan slot yang tersedia. Ini secara otomatis mengurangi paralelisme jika tidak cukup slot yang tersedia agar sesuai dengan paralelisme pekerjaan yang dikonfigurasi. Jika slot baru tersedia, pekerjaan ditingkatkan lagi ke paralelisme pekerjaan yang dikonfigurasi. Penjadwal adaptif menghindari waktu henti di tempat kerja ketika tidak ada cukup sumber daya yang tersedia. Ini adalah penjadwal yang didukung untuk Flink Autoscaler. Kami merekomendasikan penjadwal adaptif dengan Amazon EMR Flink karena alasan ini. Namun, penjadwal adaptif mungkin melakukan beberapa restart dalam waktu singkat, satu restart untuk setiap sumber daya baru yang ditambahkan. Hal ini dapat menyebabkan penurunan kinerja dalam pekerjaan.

Dengan Amazon EMR 6.15.0 dan yang lebih tinggi, Flink memiliki mekanisme restart gabungan dalam penjadwal adaptif yang membuka jendela restart ketika sumber daya pertama ditambahkan, dan kemudian menunggu hingga interval jendela yang dikonfigurasi dari default 1 menit. Ini melakukan restart tunggal ketika ada sumber daya yang cukup tersedia untuk menjalankan pekerjaan dengan paralelisme yang dikonfigurasi atau ketika interval waktu habis.

Dengan contoh pekerjaan, pengujian benchmark kami menunjukkan bahwa fitur ini memproses 10% rekaman lebih banyak daripada perilaku default saat Anda menggunakan adaptive scheduler dan Flink autoscaler.

Untuk mengaktifkan mekanisme restart gabungan, atur konfigurasi berikut di `flink-conf.yaml` file Anda.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Penonaktifan Instans Spot yang anggun dengan Flink di Amazon EMR di EKS
<a name="jobruns-flink-decommission"></a>

Flink dengan Amazon EMR di EKS dapat meningkatkan waktu restart pekerjaan selama pemulihan tugas atau operasi penskalaan.

## Ikhtisar
<a name="jobruns-flink-decommission-overview"></a>

Amazon EMR di EKS merilis 6.15.0 dan yang lebih tinggi mendukung penonaktifan Manajer Tugas di Instans Spot di Amazon EMR di EKS dengan Apache Flink. Sebagai bagian dari fitur ini, Amazon EMR di EKS dengan Flink menyediakan kemampuan berikut:
+ **Just-in-time checkpointing** — Pekerjaan streaming Flink dapat merespons interupsi Instans Spot, melakukan pos pemeriksaan just-in-time (JIT) dari pekerjaan yang sedang berjalan, dan mencegah penjadwalan tugas tambahan pada Instans Spot ini. Pos pemeriksaan JIT didukung dengan penjadwal default dan adaptif.
+ **Mekanisme restart gabungan** — Mekanisme restart gabungan melakukan upaya terbaik untuk memulai kembali pekerjaan setelah mencapai paralelisme sumber daya target atau akhir jendela yang dikonfigurasi saat ini. Ini juga mencegah restart pekerjaan berturut-turut yang mungkin disebabkan oleh beberapa penghentian Instans Spot. Mekanisme restart gabungan hanya tersedia dengan penjadwal adaptif.

Kemampuan ini memberikan manfaat sebagai berikut:
+ Anda dapat memanfaatkan Instans Spot untuk menjalankan Manajer Tugas dan mengurangi pengeluaran klaster.
+ Peningkatan keaktifan untuk Spot Instance Task Manager menghasilkan ketahanan yang lebih tinggi dan penjadwalan pekerjaan yang lebih efisien.
+ Pekerjaan Flink Anda akan memiliki lebih banyak uptime karena akan ada lebih sedikit restart dari penghentian Instans Spot.

## Cara kerja dekomisioning yang anggun
<a name="jobruns-flink-decommission-howitworks"></a>

Pertimbangkan contoh berikut: Anda menyediakan EMR Amazon di klaster EKS yang menjalankan Apache Flink, dan Anda menentukan node Sesuai Permintaan untuk Job Manager, dan node Instans Spot untuk Task Manager. Dua menit sebelum penghentian, Task Manager menerima pemberitahuan interupsi.

Dalam skenario ini, Job Manager akan menangani sinyal interupsi Instans Spot, memblokir penjadwalan tugas tambahan pada Instans Spot, dan memulai pemeriksaan JIT untuk pekerjaan streaming.

Kemudian, Job Manager akan memulai ulang grafik pekerjaan hanya setelah ada ketersediaan sumber daya baru yang cukup untuk memenuhi paralelisme pekerjaan saat ini di jendela interval restart saat ini. Interval jendela restart ditentukan berdasarkan durasi penggantian Instans Spot, pembuatan pod Task Manager baru, dan pendaftaran dengan Job Manager.

## Prasyarat
<a name="jobruns-flink-decommission-prereqs"></a>

Untuk menggunakan dekomisioning yang anggun, buat dan jalankan pekerjaan streaming di Amazon EMR di kluster EKS yang menjalankan Apache Flink. Aktifkan Penjadwal Adaptif dan Manajer Tugas yang dijadwalkan pada setidaknya satu Instance Spot, seperti yang ditunjukkan pada contoh berikut. Anda harus menggunakan node On-Demand untuk Job Manager, dan Anda dapat menggunakan node On-Demand untuk Task Manager selama setidaknya ada satu Instance Spot juga.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: deployment_name
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    cluster.taskmanager.graceful-decommission.enabled: "true"
    execution.checkpointing.interval: "240s"
    jobmanager.adaptive-scheduler.combined-restart.enabled: "true"
    jobmanager.adaptive-scheduler.combined-restart.window-interval : "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'ON_DEMAND'
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    nodeSelector:
      'eks.amazonaws.com/capacityType': 'SPOT'
  job:
    jarURI: flink_job_jar_path
```

## Konfigurasi
<a name="jobruns-flink-decommission-config"></a>

Bagian ini mencakup sebagian besar konfigurasi yang dapat Anda tentukan untuk kebutuhan penonaktifan Anda. 


| Key | Deskripsi | Nilai default | Nilai yang dapat diterima | 
| --- | --- | --- | --- | 
|  cluster.taskmanager.graceful-decommission.enabled  |  Aktifkan penonaktifan Task Manager yang anggun.  |  true  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.enabled  |  Aktifkan mekanisme restart gabungan di Adaptive Scheduler.  |  false  |  true, false  | 
|  jobmanager.adaptive-scheduler.combined-restart.window-interval  |  Interval jendela restart gabungan untuk melakukan restart gabungan untuk pekerjaan tersebut. Sebuah integer tanpa unit ditafsirkan sebagai milidetik.  |  1m  |  Contoh:30,60s,3m, 1h  | 

# Menggunakan Autoscaler untuk aplikasi Flink
<a name="jobruns-flink-autoscaler"></a>

Autoscaler operator dapat membantu meringankan tekanan balik dengan mengumpulkan metrik dari pekerjaan Flink dan secara otomatis menyesuaikan paralelisme pada tingkat titik pekerjaan. Berikut ini adalah contoh seperti apa konfigurasi Anda:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  ...
spec:
  ...
  flinkVersion: v1_18
  flinkConfiguration:
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.metrics.window: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 2m
    job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "720"
  ...
```

Konfigurasi ini menggunakan nilai default untuk rilis terbaru Amazon EMR. Jika Anda menggunakan versi lain, Anda mungkin memiliki nilai yang berbeda.

**catatan**  
Mulai Amazon EMR 7.2.0, Anda tidak perlu menyertakan awalan `kubernetes.operator` dalam konfigurasi Anda. Jika Anda menggunakan 7.1.0 atau lebih rendah, Anda harus menggunakan awalan sebelum setiap konfigurasi. Misalnya, Anda harus menentukan`kubernetes.operator.job.autoscaler.scaling.enabled`.

Berikut ini adalah opsi konfigurasi untuk autoscaler.
+ `job.autoscaler.scaling.enabled`— menentukan apakah akan mengaktifkan eksekusi penskalaan vertex oleh autoscaler. Nilai default-nya `true`. Jika Anda menonaktifkan konfigurasi ini, penskalaan otomatis hanya mengumpulkan metrik dan mengevaluasi paralelisme yang disarankan untuk setiap simpul tetapi tidak meningkatkan pekerjaan.
+ `job.autoscaler.stabilization.interval`— periode stabilisasi di mana tidak ada penskalaan baru yang akan dieksekusi. Default adalah 5 menit.
+ `job.autoscaler.metrics.window`— ukuran jendela agregasi metrik penskalaan. Semakin besar jendela, semakin halus dan stabil, tetapi autoscaler mungkin lebih lambat untuk bereaksi terhadap perubahan beban mendadak. Default adalah 15 menit. Kami menyarankan Anda bereksperimen dengan menggunakan nilai antara 3 hingga 60 menit.
+ `job.autoscaler.target.utilization`— pemanfaatan simpul target untuk memberikan kinerja pekerjaan yang stabil dan beberapa buffer untuk fluktuasi beban. Defaultnya `0.7` menargetkan 70% utilization/load untuk verteks pekerjaan.
+ `job.autoscaler.target.utilization.boundary`— batas pemanfaatan simpul target yang berfungsi sebagai buffer ekstra untuk menghindari penskalaan langsung pada fluktuasi beban. Defaultnya adalah`0.3`, yang berarti 30% deviasi dari pemanfaatan target diperbolehkan sebelum memicu tindakan penskalaan.
+ `ob.autoscaler.restart.time`— waktu yang diharapkan untuk me-restart aplikasi. Default adalah 5 menit.
+ `job.autoscaler.catch-up.duration`— waktu yang diharapkan untuk catch up, yang berarti sepenuhnya memproses setiap backlog setelah operasi penskalaan selesai. Default adalah 5 menit. Dengan menurunkan durasi catch-up, autoscaler harus memesan lebih banyak kapasitas ekstra untuk tindakan penskalaan.
+ `pipeline.max-parallelism`— paralelisme maksimum yang dapat digunakan autoscaler. Autoscaler mengabaikan batas ini jika lebih tinggi dari paralelisme maks yang dikonfigurasi dalam konfigurasi Flink atau langsung pada setiap operator. Defaultnya adalah -1. Perhatikan bahwa autoscaler menghitung paralelisme sebagai pembagi bilangan paralelisme maks oleh karena itu disarankan untuk memilih pengaturan paralelisme maks yang memiliki banyak pembagi daripada mengandalkan default yang disediakan Flink. Kami merekomendasikan penggunaan kelipatan 60 untuk konfigurasi ini, seperti 120, 180, 240, 360, 720 dll.

Untuk halaman referensi konfigurasi yang lebih detail, lihat Konfigurasi [Autoscaler](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#autoscaler-configuration).

# Autotuning parameter Autoscaler
<a name="jobruns-flink-autoscaler-parameter-autotuning"></a>

Bagian ini menjelaskan perilaku penyetelan otomatis untuk berbagai versi EMR Amazon. Ini juga masuk ke detail mengenai konfigurasi auto-scaling yang berbeda.

**catatan**  
Amazon EMR 7.2.0 dan yang lebih tinggi menggunakan konfigurasi open source `job.autoscaler.restart.time-tracking.enabled` untuk mengaktifkan estimasi waktu penskalaan **ulang**. Estimasi waktu skala ulang memiliki fungsionalitas yang sama dengan autotuning Amazon EMR, jadi Anda tidak perlu menetapkan nilai empiris secara manual ke waktu restart.  
Anda masih dapat menggunakan autotuning Amazon EMR jika Anda menggunakan Amazon EMR 7.1.0 atau lebih rendah.

------
#### [ 7.2.0 and higher ]

Amazon EMR 7.2.0 dan yang lebih tinggi mengukur waktu restart aktual yang diperlukan untuk menerapkan keputusan penskalaan otomatis. Dalam rilis 7.1.0 dan yang lebih rendah, Anda harus menggunakan konfigurasi `job.autoscaler.restart.time` untuk mengonfigurasi perkiraan waktu restart maksimum secara manual. Dengan menggunakan konfigurasi`job.autoscaler.restart.time-tracking.enabled`, Anda hanya perlu memasukkan waktu restart untuk penskalaan pertama. Setelah itu, operator mencatat waktu restart aktual dan akan menggunakannya untuk penskalaan berikutnya.

Untuk mengaktifkan pelacakan ini, gunakan perintah berikut:

```
job.autoscaler.restart.time-tracking.enabled: true
```

Berikut ini adalah konfigurasi terkait untuk estimasi waktu penskalaan ulang.


| Konfigurasi | Diperlukan | Default | Deskripsi | 
| --- | --- | --- | --- | 
| job.autoscaler.restart.time-tracking.enabled | Tidak | False | Menunjukkan apakah Flink Autoscaler harus secara otomatis menyetel konfigurasi dari waktu ke waktu untuk mengoptimalkan dessisi penskalaan. Perhatikan bahwa Autoscaler hanya dapat melakukan autotune parameter Autoscaler. restart.time | 
| job.autoscaler.restart.time | Tidak | 5m | Waktu restart yang diharapkan yang digunakan Amazon EMR di EKS hingga operator dapat menentukan waktu restart aktual dari penskalaan sebelumnya. | 
| job.autoscaler.restart.time-tracking.limit | Tidak | 15m | Waktu restart maksimum yang diamati saat job.autoscaler.restart.time-tracking.enabled diatur ketrue. | 

Berikut ini adalah contoh spesifikasi penerapan yang dapat Anda gunakan untuk mencoba estimasi waktu penskalaan ulang:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autoscaler parameters
    job.autoscaler.enabled: "true"
    job.autoscaler.scaling.enabled: "true"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.metrics.window: "1m"
    
    job.autoscaler.restart.time-tracking.enabled: "true"
    job.autoscaler.restart.time: "2m"
    job.autoscaler.restart.time-tracking.limit: "10m"
    
    jobmanager.scheduler: adaptive
    taskmanager.numberOfTaskSlots: "1"
    pipeline.max-parallelism: "12"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-7.12.0-flink-latest
  jobManager:
    highAvailabilityEnabled: false
    storageDir: s3://<s3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<s3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: stateless
```

Untuk mensimulasikan tekanan balik, gunakan spesifikasi penerapan berikut.

```
job:
    jarURI: s3://<s3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: stateless
```

Unggah skrip Python berikut ke bucket S3 Anda.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Untuk memverifikasi bahwa estimasi waktu penskalaan ulang berfungsi, pastikan pencatatan `DEBUG` level operator Flink diaktifkan. Contoh di bawah ini menunjukkan cara memperbarui file bagan helm. `values.yaml` Kemudian instal ulang bagan helm yang diperbarui dan jalankan pekerjaan Flink Anda lagi.

```
log4j-operator.properties: |+
  # Flink Operator Logging Overrides
  rootLogger.level = DEBUG
```

Dapatkan nama pod pemimpin Anda.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Jalankan perintah berikut untuk mendapatkan waktu restart aktual yang digunakan dalam evaluasi metrik.

```
kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"
```

Anda akan melihat log yang mirip dengan yang berikut ini. Perhatikan bahwa hanya penskalaan pertama yang digunakan` job.autoscaler.restart.time`. Penskalaan selanjutnya menggunakan waktu restart yang diamati.

```
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor        [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
```

------
#### [ 7.0.0 and 7.1.0 ]

Flink Autoscaler bawaan open source menggunakan banyak metrik untuk membuat keputusan penskalaan terbaik. Namun, nilai default yang digunakan untuk perhitungannya dimaksudkan untuk dapat diterapkan pada sebagian besar beban kerja dan mungkin tidak optimal untuk pekerjaan tertentu. Fitur autotuning yang ditambahkan ke EMR Amazon pada versi EKS dari Operator Flink melihat tren historis yang diamati pada metrik tertentu yang ditangkap dan kemudian mencoba menghitung nilai paling optimal yang disesuaikan untuk pekerjaan yang diberikan.


| Konfigurasi | Diperlukan | Default | Deskripsi | 
| --- | --- | --- | --- | 
| kubernetes.operator.job.autoscaler.autotune.enable | Tidak | False | Menunjukkan apakah Flink Autoscaler harus secara otomatis menyetel konfigurasi dari waktu ke waktu untuk mengoptimalkan dessisi penskalaan penskalaan otomatis. Saat ini, Autoscaler hanya dapat melakukan autotune parameter Autoscaler. restart.time | 
| kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count | Tidak | 3 | Menunjukkan berapa banyak historis Amazon EMR pada metrik EKS yang disimpan Autoscaler di Amazon EMR di peta konfigurasi metrik EKS. | 
| kubernetes.operator.job.autoscaler.autotune.metrics.restart.count | Tidak | 3 | Menunjukkan berapa banyak jumlah restart yang dilakukan Autoscaler sebelum mulai menghitung waktu restart rata-rata untuk pekerjaan tertentu. | 

Untuk mengaktifkan autotuning, Anda harus menyelesaikan yang berikut ini:
+ Setel `kubernetes.operator.job.autoscaler.autotune.enable:` ke `true`
+ Setel `metrics.job.status.enable:` ke `TOTAL_TIME`
+ Mengikuti pengaturan [Menggunakan Autoscaler untuk aplikasi Flink untuk mengaktifkan](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-autoscaler.html) Autoscaling.

Berikut ini adalah contoh spesifikasi penerapan yang dapat Anda gunakan untuk mencoba autotuning.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: autoscaling-example
spec:
  flinkVersion: v1_18
  flinkConfiguration:

    # Autotuning parameters
    kubernetes.operator.job.autoscaler.autotune.enable: "true"
    kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2"
    kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1"
    metrics.job.status.enable: TOTAL_TIME

    # Autoscaler parameters
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "5s"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"

    jobmanager.scheduler: adaptive

    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/
    state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/
    pipeline.max-parallelism: "4"

  executionRoleArn: <JOB ARN>
  emrReleaseLabel: emr-6.14.0-flink-latest
  jobManager:
    highAvailabilityEnabled: true
    storageDir: s3://<S3_bucket>/flink/autoscaling/ha/
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: s3://<S3_bucket>/some-job-with-back-pressure
    parallelism: 1
    upgradeMode: last-state
```

Untuk mensimulasikan tekanan balik, gunakan spesifikasi penerapan berikut.

```
  job:
    jarURI: s3://<S3_bucket>/pyflink-script.py
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
    parallelism: 1
    upgradeMode: last-state
```

Unggah skrip Python berikut ke bucket S3 Anda.

```
import logging
import sys
import time
import random

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);
"""

def create_backpressure(i):
    time.sleep(2)
    return i

def autoscaling_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    t_env.execute_sql(QUERY)
    res_table = t_env.from_path(TABLE_NAME)

    stream =  t_env.to_data_stream(res_table) \
      .shuffle().map(lambda x: create_backpressure(x))\
      .print()
    env.execute("Autoscaling demo")

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    autoscaling_demo()
```

Untuk memverifikasi bahwa autotuner Anda berfungsi, gunakan perintah berikut. Perhatikan bahwa Anda harus menggunakan informasi pod pemimpin Anda sendiri untuk Operator Flink.

Pertama dapatkan nama pod pemimpin Anda.

```
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')

kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

Setelah Anda memiliki nama pod pemimpin Anda, Anda dapat menjalankan perintah berikut.

```
kubectl logs -n $NAMESPACE  -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME>  | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'
```

Anda akan melihat log yang mirip dengan yang berikut ini.

```
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1))

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
```

------

# Pemeliharaan dan pemecahan masalah untuk pekerjaan Flink di Amazon EMR di EKS
<a name="jobruns-flink-troubleshooting"></a>

Bagian berikut menguraikan cara mempertahankan pekerjaan Flink Anda yang sudah berjalan lama, dan memberikan panduan tentang cara memecahkan masalah umum dengan pekerjaan Flink.

# Mempertahankan aplikasi Flink
<a name="jobruns-flink-maintain"></a>

**Topics**
+ [Mode upgrade](#jobruns-flink-upgrademode)

Aplikasi Flink biasanya dirancang untuk berjalan dalam jangka waktu yang lama seperti minggu, bulan, atau bahkan bertahun-tahun. Seperti semua layanan yang berjalan lama, aplikasi streaming Flink perlu dipertahankan. Ini termasuk perbaikan bug, peningkatan, dan migrasi ke cluster Flink versi yang lebih baru.

Ketika spesifikasi berubah `FlinkDeployment` dan `FlinkSessionJob` sumber daya, Anda perlu memutakhirkan aplikasi yang sedang berjalan. Untuk melakukan ini, operator menghentikan pekerjaan yang sedang berjalan (kecuali sudah ditangguhkan) dan menerapkannya kembali dengan spesifikasi terbaru dan, untuk aplikasi stateful, status dari proses sebelumnya.

Pengguna mengontrol cara mengelola status saat aplikasi stateful berhenti dan memulihkan dengan `upgradeMode` pengaturan. `JobSpec`

## Mode upgrade
<a name="jobruns-flink-upgrademode"></a>

Pengenalan opsional

**Tanpa kewarganegaraan**  
Upgrade aplikasi stateless dari status kosong.

**Keadaan terakhir**  
Peningkatan cepat dalam keadaan aplikasi apa pun (bahkan untuk pekerjaan yang gagal), tidak memerlukan pekerjaan yang sehat karena selalu menggunakan pos pemeriksaan terbaru yang berhasil. Pemulihan manual mungkin diperlukan jika metadata HA hilang. Untuk membatasi waktu pekerjaan mungkin mundur saat mengambil pos pemeriksaan terbaru yang dapat Anda konfigurasikan`kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age`. Jika pos pemeriksaan lebih tua dari nilai yang dikonfigurasi, savepoint akan diambil sebagai gantinya untuk pekerjaan yang sehat. Ini tidak didukung dalam mode Sesi. 

**Savepoint**  
Gunakan savepoint untuk peningkatan, memberikan keamanan maksimal dan kemungkinan untuk berfungsi sebagai titik. backup/fork Savepoint akan dibuat selama proses upgrade. Perhatikan bahwa pekerjaan Flink harus dijalankan untuk memungkinkan savepoint dibuat. Jika pekerjaan dalam keadaan tidak sehat, pos pemeriksaan terakhir akan digunakan (kecuali kubernetes.operator.job.upgrade. last-state-fallback.enabled disetel ke false). Jika pos pemeriksaan terakhir tidak tersedia, peningkatan pekerjaan akan gagal.

# Pemecahan masalah
<a name="jobruns-flink-troubleshoot"></a>

Bagian ini menjelaskan cara memecahkan masalah dengan Amazon EMR di EKS. *Untuk informasi tentang cara memecahkan masalah umum dengan Amazon EMR, lihat [Memecahkan masalah klaster di](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot.html) Panduan Manajemen EMR Amazon.*
+ [Memecahkan masalah pekerjaan yang menggunakan PersistentVolumeClaims (PVC)](permissions-for-pvc.md)
+ [Memecahkan masalah Amazon EMR pada penskalaan otomatis vertikal EKS](troubleshooting-vas.md)
+ [Memecahkan masalah Amazon EMR pada operator EKS Spark](troubleshooting-sparkop.md)

## Memecahkan masalah Apache Flink di Amazon EMR di EKS
<a name="jobruns-flink-troubleshooting-apache-flink"></a>

### Pemetaan sumber daya tidak ditemukan saat menginstal bagan Helm
<a name="w2aac21c21b7b7b3"></a>

Anda mungkin menemukan pesan galat berikut saat menginstal bagan Helm.

```
Error: INSTALLATION FAILED: pulling from host 1234567890.dkr.ecr.us-west-2.amazonaws.com failed with status code [manifests 6.13.0]: 403 Forbidden Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "<the namespace to install your operator>" from "": no matches for kind "Certificate" in version "cert-manager.io/v1"

ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "<the namespace to install your operator>" " from "": no matches for kind "Issuer" in version "cert-manager.io/v1"

ensure CRDs are installed first].
```

Untuk mengatasi kesalahan ini, instal cert-manager untuk mengaktifkan penambahan komponen webhook. Anda harus menginstal cert-manager ke setiap cluster Amazon EKS yang Anda gunakan.

```
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0
```

### Layanan AWS akses ditolak kesalahan
<a name="jobruns-flink-troubleshooting-access-denied"></a>

Jika Anda melihat *access denied* kesalahan, konfirmasikan bahwa peran IAM untuk `operatorExecutionRoleArn` dalam `values.yaml` file bagan Helm memiliki izin yang benar. Pastikan juga peran IAM `executionRoleArn` di bawah `FlinkDeployment` spesifikasi Anda memiliki izin yang benar.

### `FlinkDeployment`macet
<a name="jobruns-flink-troubleshooting-stuck"></a>

Jika Anda `FlinkDeployment` terhenti dalam keadaan tertangkap, gunakan langkah-langkah berikut untuk menghapus paksa penyebaran:

1. Edit proses penerapan.

   ```
   kubectl edit -n Flink Namespace flinkdeployments/App Name
   ```

1. Hapus finalizer ini.

   ```
   finalizers:
     - flinkdeployments.flink.apache.org/finalizer
   ```

1. Hapus penyebaran.

   ```
   kubectl delete -n Flink Namespace flinkdeployments/App Name
   ```

### AWSBadRequestException masalah s3a saat menjalankan aplikasi Flink dalam keikutsertaan Wilayah AWS
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Jika Anda menjalankan aplikasi Flink dalam [opt-in Wilayah AWS](https://docs.aws.amazon.com/controltower/latest/userguide/opt-in-region-considerations.html), Anda mungkin melihat kesalahan berikut:

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on 
s3://flink.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID:
ABCDEFGHIJKLMNOP=; Proxy: null), S3 Extended Request ID: ABCDEFGHIJKLMNOP=:400 Bad Request: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: ABCDEFGHIJKL; S3 Extended Request ID: ABCDEFGHIJKLMNOP=; Proxy: null)
```

```
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getS3Region on flink-application: software.amazon.awssdk.services.s3.model.S3Exception: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: ABCDEFGHIJKLMNOPQRST==):null: null 
(Service: S3, Status Code: 400, Request ID: ABCDEFGHIJKLMNOP, Extended Request ID: AHl42uDNaTUFOus/5IIVNvSakBcMjMCH7dd37ky0vE6jhABCDEFGHIJKLMNOPQRST==)
```

Untuk memperbaiki kesalahan ini, gunakan konfigurasi berikut dalam file `FlinkDeployment` definisi Anda.

```
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
```

Kami juga menyarankan Anda menggunakan penyedia SDKv2 kredensi:

```
fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
```

Jika Anda ingin menggunakan penyedia SDKv1 kredensial, pastikan SDK mendukung Wilayah keikutsertaan Anda. Untuk informasi lebih lanjut, lihat [aws-sdk-java GitHub repositori](https://github.com/aws/aws-sdk-java).

Jika Anda mendapatkan `S3 AWSBadRequestException` ketika Anda menjalankan pernyataan SQL Flink di Region opt-in, pastikan bahwa Anda mengatur konfigurasi `fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME` dalam spesifikasi konfigurasi flink Anda.

### S3A AWSBad RequestException saat menjalankan pekerjaan sesi Flink di wilayah CN
<a name="jobruns-flink-troubleshooting-optin-region"></a>

Untuk Amazon EMR rilis 6.15.0 - 7.2.0, Anda mungkin menemukan pesan galat berikut saat menjalankan tugas sesi Flink di wilayah CN. Ini termasuk Tiongkok (Beijing) dan Tiongkok (Ningxia):

```
Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.hadoop.fs.s3a.AWSBadRequestException: 
                    getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: 
                    ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{},"throwableList":
                    [{"type":"org.apache.hadoop.fs.s3a.AWSBadRequestException","message":"getFileStatus on s3://ABCDPath: software.amazon.awssdk.services.s3.model.S3Exception: 
                    null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH:null: null (Service: S3, Status Code: 400, Request ID: ABCDEFGH, 
                    Extended Request ID: ABCDEFGH","additionalMetadata":{}},{"type":"software.amazon.awssdk.services.s3.model.S3Exception","message":"null (Service: S3, Status Code: 400, 
                    Request ID: ABCDEFGH, Extended Request ID: ABCDEFGH","additionalMetadata":{}}]}
```

Ada kesadaran akan masalah ini. Tim sedang bekerja untuk menambal operator flink untuk semua versi rilis ini. Namun, sebelum kita menyelesaikan tambalan, untuk memperbaiki kesalahan ini, Anda perlu mengunduh bagan helm operator flink, menghapusnya (ekstrak file terkompresi) dan membuat perubahan konfigurasi di bagan helm.

Langkah-langkah spesifiknya adalah sebagai berikut:

1. Ubah ke, secara khusus mengubah direktori ke, folder lokal Anda untuk bagan helm, dan jalankan baris perintah berikut untuk menarik bagan helm dan untar (ekstrak) itu.

   ```
   helm pull oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --version $VERSION \
   --namespace $NAMESPACE
   ```

   ```
   tar -zxvf flink-kubernetes-operator-$VERSION.tgz
   ```

1. Masuk ke folder bagan helm dan temukan `templates/flink-operator.yaml` filenya.

1. Temukan `flink-operator-config` ConfigMap dan tambahkan `fs.s3a.endpoint.region` konfigurasi berikut di file`flink-conf.yaml`. Contoh:

   ```
   {{- if .Values.defaultConfiguration.create }}
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flink-operator-config
     namespace: {{ .Release.Namespace }}
     labels:
       {{- include "flink-operator.labels" . | nindent 4 }}
   data:
     flink-conf.yaml: |+
   fs.s3a.endpoint.region: {{ .Values.emrContainers.awsRegion }}
   ```

1. Instal bagan helm lokal dan jalankan pekerjaan Anda.

# Rilis yang didukung untuk Amazon EMR di EKS dengan Apache Flink
<a name="jobruns-flink-security-release-versions"></a>

Apache Flink tersedia dengan EMR Amazon berikut pada rilis EKS. Untuk informasi tentang semua rilis yang tersedia, lihat[Amazon EMR pada rilis EKS](emr-eks-releases.md).


| Label rilis | Java | Flink | Operator Flink | 
| --- | --- | --- | --- | 
|  **emr-7.2.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.2.0-flink-k8s-operator-latest**  |  11  |  -  |  1.8.0  | 
|  **emr-7.1.0-flink-latest**  |  17  |  1.18.1  |  -  | 
|  **emr-7.1.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-7.0.0-flink-latest**  |  11  |  1.18.0  |  -  | 
|  **emr-7.0.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.1  | 
|  **emr-6.15.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.15.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.14.0-flink-latest**  |  11  |  1.17.1  |  -  | 
|  **emr-6.14.0-flink-k8s-operator-latest**  |  11  |  -  |  1.6.0  | 
|  **emr-6.13.0-flink-latest**  |  11  |  1.17.0  |  -  | 
|  **emr-6.13.0-flink-k8s-operator-latest**  |  11  |  -  |  1.5.0  | 