

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan ketersediaan tinggi (HA) untuk Operator Flink dan Aplikasi Flink
<a name="jobruns-flink-using-ha"></a>

Topik ini menunjukkan cara mengonfigurasi ketersediaan tinggi dan menjelaskan cara kerjanya untuk beberapa kasus penggunaan yang berbeda. Ini termasuk ketika Anda menggunakan Job manager dan ketika Anda menggunakan kubernetes asli Flink.

## Operator Flink ketersediaan tinggi
<a name="jobruns-flink-ha-operator"></a>

Kami mengaktifkan *ketersediaan tinggi* untuk Operator Flink sehingga kami dapat gagal ke Operator Flink siaga untuk meminimalkan waktu henti di loop kontrol operator jika terjadi kegagalan. Ketersediaan tinggi diaktifkan secara default dan jumlah default replika operator awal adalah 2. Anda dapat mengonfigurasi bidang replika di `values.yaml` file Anda untuk bagan helm.

Bidang berikut dapat disesuaikan:
+ `replicas`(opsional, defaultnya adalah 2): Menyetel nomor ini menjadi lebih besar dari 1 membuat Operator siaga lainnya dan memungkinkan pemulihan pekerjaan Anda lebih cepat.
+ `highAvailabilityEnabled`(opsional, defaultnya benar): Mengontrol apakah Anda ingin mengaktifkan HA. Menentukan parameter ini sebagai true memungkinkan dukungan penyebaran multi AZ, serta menetapkan parameter yang benar`flink-conf.yaml`.

Anda dapat menonaktifkan HA untuk operator Anda dengan mengatur konfigurasi berikut di `values.yaml` file Anda.

```
...
imagePullSecrets: []

replicas: 1

# set this to false if you don't want HA
highAvailabilityEnabled: false
...
```

**Penyebaran multi AZ**

Kami membuat pod operator di beberapa Availability Zone. Ini adalah kendala lunak, dan pod operator Anda akan dijadwalkan di AZ yang sama jika Anda tidak memiliki cukup sumber daya di AZ yang berbeda.

**Menentukan replika pemimpin**

 Jika HA diaktifkan, replika menggunakan sewa untuk menentukan pemimpin mana dan menggunakan Sewa K8 untuk pemilihan pemimpin. JMs Anda dapat menjelaskan Sewa dan melihat bidang Identitas .Spec.Holder untuk menentukan pemimpin saat ini

```
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
```

**Interaksi Flink-S3**

**Mengkonfigurasi kredenal akses**

Pastikan Anda telah mengonfigurasi IRSA dengan izin IAM yang sesuai untuk mengakses bucket S3.

**Mengambil stoples pekerjaan dari mode Aplikasi S3**

Operator Flink juga mendukung pengambilan stoples aplikasi dari S3. Anda hanya menyediakan lokasi S3 untuk JarUri dalam spesifikasi Anda FlinkDeployment .

Anda juga dapat menggunakan fitur ini untuk mengunduh artefak lain seperti PyFlink skrip. Skrip Python yang dihasilkan dijatuhkan di bawah jalur. `/opt/flink/usrlib/`

Contoh berikut menunjukkan bagaimana menggunakan fitur ini untuk PyFlink pekerjaan. Perhatikan bidang JarUri dan args.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: <YOUR CUSTOM PYFLINK IMAGE>
  emrReleaseLabel: "emr-6.12.0-flink-latest"
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"]
    parallelism: 1
    upgradeMode: stateless
```

**Konektor Flink S3**

Flink dikemas dengan dua konektor S3 (tercantum di bawah). Bagian berikut membahas kapan harus menggunakan konektor mana.

**Checkpointing: Konektor Presto S3**
+ Setel skema S3 ke s3p://
+ Konektor yang disarankan untuk digunakan ke pos pemeriksaan ke s3. Untuk informasi selengkapnya, lihat [khusus S3 dalam dokumentasi](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#s3-specific) Apache Flink.

Contoh FlinkDeployment spesifikasi:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
```

**Membaca dan menulis ke S3: konektor Hadoop S3**
+ Atur skema S3 ke `s3://` atau () `s3a://`
+ Konektor yang direkomendasikan untuk membaca dan menulis file dari S3 (hanya konektor S3 yang mengimplementasikan antarmuka [Flinks](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/) Filesystem).
+ Secara default, kami mengatur `fs.s3a.aws.credentials.provider` dalam `flink-conf.yaml` file, yaitu`com.amazonaws.auth.WebIdentityTokenCredentialsProvider`. Jika Anda mengganti d efault `flink-conf` sepenuhnya dan Anda berinteraksi dengan S3, pastikan untuk menggunakan penyedia ini.

Contoh FlinkDeployment spesifikasi

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  job:
    jarURI: local:///opt/flink/examples/streaming/WordCount.jar
    args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ]
    parallelism: 2
    upgradeMode: stateless
```

## Manajer Pekerjaan Flink
<a name="jobruns-flink-ha-manager"></a>

Ketersediaan Tinggi (HA) untuk Penerapan Flink memungkinkan pekerjaan terus membuat kemajuan bahkan jika kesalahan sementara ditemukan dan crash Anda. JobManager Pekerjaan akan dimulai ulang tetapi dari pos pemeriksaan terakhir yang berhasil dengan HA diaktifkan. Tanpa HA diaktifkan, Kubernetes akan memulai ulang pekerjaan Anda JobManager, tetapi pekerjaan Anda akan dimulai sebagai pekerjaan baru dan akan kehilangan kemajuannya. Setelah mengonfigurasi HA, kami dapat memberi tahu Kubernetes untuk menyimpan metadata HA dalam penyimpanan persisten untuk referensi jika terjadi kegagalan sementara di JobManager dan kemudian melanjutkan pekerjaan kami dari pos pemeriksaan terakhir yang berhasil.

HA diaktifkan secara default untuk pekerjaan Flink Anda (jumlah replika disetel ke 2, yang mengharuskan Anda menyediakan lokasi penyimpanan S3 agar metadata HA tetap ada).

**Konfigurasi HA**

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: "<JOB EXECUTION ROLE ARN>"
  emrReleaseLabel: "emr-6.13.0-flink-latest"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2
    highAvailabilityEnabled: true
    storageDir: "s3://<S3 PERSISTENT STORAGE DIR>"
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
```

Berikut ini adalah deskripsi untuk konfigurasi HA di atas di Job Manager (didefinisikan di bawah.spec.jobManager):
+ `highAvailabilityEnabled`(opsional, default adalah true): Setel ini ke `false ` jika Anda tidak ingin HA diaktifkan dan tidak ingin menggunakan konfigurasi HA yang disediakan. Anda masih dapat memanipulasi bidang “replika” untuk mengonfigurasi HA secara manual.
+ `replicas`(opsional, defaultnya adalah 2): Menyetel nomor ini menjadi lebih besar dari 1 membuat siaga lainnya JobManagers dan memungkinkan pemulihan pekerjaan Anda lebih cepat. Jika Anda menonaktifkan HA, Anda harus mengatur jumlah replika ke 1, atau Anda akan terus mendapatkan kesalahan validasi (hanya 1 replika yang didukung jika HA tidak diaktifkan).
+ `storageDir`(required): Karena kami menggunakan jumlah replika sebagai 2 secara default, kami harus menyediakan StorageDir persisten. Saat ini bidang ini hanya menerima jalur S3 sebagai lokasi penyimpanan.

**Lokalitas pod**

 Jika Anda mengaktifkan HA, kami juga mencoba mengkolokasi pod di AZ yang sama, yang mengarah pada peningkatan kinerja (mengurangi latensi jaringan dengan memiliki pod yang sama AZs). Ini adalah proses upaya terbaik, artinya jika Anda tidak memiliki cukup sumber daya di AZ di mana sebagian besar Pod Anda dijadwalkan, Pod yang tersisa masih akan dijadwalkan tetapi mungkin berakhir pada node di luar AZ ini.

**Menentukan replika pemimpin**

Jika HA diaktifkan, replika menggunakan sewa untuk menentukan pemimpin mana dan menggunakan Configmap K8s sebagai datastore untuk menyimpan metadata ini. JMs Jika Anda ingin menentukan pemimpin, Anda dapat melihat konten Configmap dan melihat kunci di `org.apache.flink.k8s.leader.restserver` bawah data untuk menemukan pod K8s dengan alamat IP. Anda juga dapat menggunakan perintah bash berikut.

```
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n NAMESPACE  -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
```

## Pekerjaan Flink - Kubernetes asli
<a name="jobruns-flink-ha-kubernetes"></a>

Amazon EMR 6.13.0 dan yang lebih tinggi mendukung Kubernetes asli Flink untuk menjalankan aplikasi Flink dalam mode ketersediaan tinggi pada cluster Amazon EKS. 

**catatan**  
Anda harus memiliki bucket Amazon S3 yang dibuat untuk menyimpan metadata ketersediaan tinggi saat mengirimkan pekerjaan Flink Anda. Jika Anda tidak ingin menggunakan fitur ini, Anda dapat menonaktifkannya. Ini diaktifkan secara default.

Untuk mengaktifkan fitur ketersediaan tinggi Flink, berikan parameter Flink berikut saat Anda menjalankan perintah [CLI `run-application`](jobruns-flink-native-kubernetes-getting-started.md#jobruns-flink-native-kubernetes-getting-started-run-application). Parameter didefinisikan di bawah contoh.

```
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \
-Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \
-Dkubernetes.jobmanager.replicas=3 \
-Dkubernetes.cluster-id=example-cluster
```
+ **`Dhigh-availability.storageDir`**- Bucket Amazon S3 tempat Anda ingin menyimpan metadata ketersediaan tinggi untuk pekerjaan Anda.

  **`Dkubernetes.jobmanager.replicas`**— Jumlah pod Job Manager yang akan dibuat sebagai bilangan bulat lebih besar dari`1`.

  **`Dkubernetes.cluster-id`**— ID unik yang mengidentifikasi cluster Flink.