

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Kembangkan konsumen khusus dengan throughput bersama
<a name="shared-throughput-consumers"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Jika Anda tidak memerlukan throughput khusus saat menerima data dari Kinesis Data Streams, dan jika Anda tidak memerlukan penundaan propagasi baca di bawah 200 ms, Anda dapat membuat aplikasi konsumen seperti yang dijelaskan dalam topik berikut. Anda dapat menggunakan Kinesis Client Library (KCL) atau. AWS SDK untuk Java

**Topics**
+ [

# Kembangkan konsumen kustom dengan throughput bersama menggunakan KCL
](custom-kcl-consumers.md)

Untuk informasi tentang membangun konsumen yang dapat menerima catatan dari aliran data Kinesis dengan throughput khusus, lihat. [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md)

# Kembangkan konsumen kustom dengan throughput bersama menggunakan KCL
<a name="custom-kcl-consumers"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Salah satu metode pengembangan aplikasi konsumen khusus dengan throughput bersama adalah dengan menggunakan Kinesis Client Library (KCL). 

Pilih dari topik berikut untuk versi KCL yang Anda gunakan.

**Topics**
+ [

# Kembangkan konsumen KCL 1.x
](developing-consumers-with-kcl.md)
+ [

# Kembangkan Konsumen KCL 2.x
](developing-consumers-with-kcl-v2.md)

# Kembangkan konsumen KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat mengembangkan aplikasi konsumen untuk Amazon Kinesis Data Streams menggunakan Kinesis Client Library (KCL). 

Untuk informasi lebih lanjut tentang KCL, lihat[Tentang KCL (versi sebelumnya)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Pilih dari topik berikut tergantung pada opsi yang ingin Anda gunakan.

**Topics**
+ [

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
](kinesis-record-processor-implementation-app-java.md)
+ [

# Mengembangkan konsumen Perpustakaan Klien Kinesis di Node.js
](kinesis-record-processor-implementation-app-nodejs.md)
+ [

# Mengembangkan konsumen Perpustakaan Klien Kinesis di .NET
](kinesis-record-processor-implementation-app-dotnet.md)
+ [

# Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python
](kinesis-record-processor-implementation-app-py.md)
+ [

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Ruby
](kinesis-record-processor-implementation-app-ruby.md)

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
<a name="kinesis-record-processor-implementation-app-java"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Java. Untuk melihat referensi Javadoc, lihat topik [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) untuk Kelas. AmazonKinesisClient

Untuk mengunduh Java KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Untuk menemukan Java KCL di Apache Maven, buka halaman hasil pencarian [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Untuk mengunduh kode sampel untuk aplikasi konsumen Java KCL dari GitHub, buka halaman [proyek sampel KCL untuk Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis). GitHub 

Aplikasi sampel menggunakan [Apache Commons Logging.](http://commons.apache.org/proper/commons-logging/guide.html) Anda dapat mengubah konfigurasi logging dalam `configure` metode statis yang ditentukan dalam `AmazonKinesisApplicationSample.java` file. *Untuk informasi selengkapnya tentang cara menggunakan Apache Commons Logging dengan aplikasi Log4j dan AWS Java, lihat [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) di Panduan Pengembang.AWS SDK untuk Java *

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Jawa:

**Topics**
+ [

## Menerapkan metode IRecord Processor
](#kinesis-record-processor-implementation-interface-java)
+ [

## Menerapkan pabrik kelas untuk antarmuka IRecord Processor
](#kinesis-record-processor-implementation-factory-java)
+ [

## Buat pekerja
](#kcl-java-worker)
+ [

## Ubah properti konfigurasi
](#kinesis-record-processor-initialization-java)
+ [

## Migrasi ke Versi 2 dari antarmuka prosesor rekaman
](#kcl-java-v2-migration)

## Menerapkan metode IRecord Processor
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL saat ini mendukung dua versi antarmuka`IRecordProcessor`: Antarmuka asli tersedia dengan versi pertama KCL, dan versi 2 tersedia dimulai dengan KCL versi 1.5.0. Kedua antarmuka didukung penuh. Pilihan Anda tergantung pada persyaratan skenario spesifik Anda. Lihat Javadocs yang dibuat secara lokal atau kode sumber untuk melihat semua perbedaannya. Bagian berikut menguraikan implementasi minimal untuk memulai.

**Topics**
+ [

### Antarmuka Asli (Versi 1)
](#kcl-java-interface-original)
+ [

### Antarmuka yang diperbarui (Versi 2)
](#kcl-java-interface-v2)

### Antarmuka Asli (Versi 1)
<a name="kcl-java-interface-original"></a>

`IRecordProcessor`Antarmuka asli (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) memperlihatkan metode prosesor rekaman berikut yang harus diterapkan konsumen Anda. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**menginisialisasi**  
KCL memanggil `initialize` metode ketika prosesor rekaman dipakai, melewati ID pecahan tertentu sebagai parameter. Prosesor rekaman ini hanya memproses pecahan ini dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Kinesis Data *Streams memiliki semantik setidaknya* sekali, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
KCL memanggil `processRecords` metode, melewati daftar catatan data dari pecahan yang ditentukan oleh metode. `initialize(shardId)` Prosesor rekaman memproses data dalam catatan ini sesuai dengan semantik konsumen. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `Record`Kelas mengekspos metode berikut yang menyediakan akses ke data catatan, nomor urut, dan kunci partisi. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Dalam sampel, metode privat `processRecordsWithRetries` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan meneruskan checkpointer () `IRecordProcessorCheckpointer` ke. `processRecords` Prosesor rekaman memanggil `checkpoint` metode pada antarmuka ini untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau merge, KCL tidak akan mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`processRecords`. Prosesor dapat, misalnya, memanggil `checkpoint` setiap panggilan ketiga ke`processRecords`. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Dalam sampel, metode pribadi `checkpoint` menunjukkan cara memanggil `IRecordProcessorCheckpointer.checkpoint` menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL bergantung pada `processRecords` untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Jika pengecualian dilemparkan`processRecords`, KCL melompati catatan data yang diteruskan sebelum pengecualian. Artinya, catatan ini tidak dikirim kembali ke prosesor rekaman yang melemparkan pengecualian atau ke prosesor rekaman lainnya di konsumen.

**penonaktifan**  
KCL memanggil `shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (alasan shutdown adalah). `ZOMBIE`

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `IRecordProcessorCheckpointer` antarmuka ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

### Antarmuka yang diperbarui (Versi 2)
<a name="kcl-java-interface-v2"></a>

`IRecordProcessor`Antarmuka (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) yang diperbarui memperlihatkan metode prosesor rekaman berikut yang harus diterapkan konsumen Anda: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Semua argumen dari versi asli antarmuka dapat diakses melalui metode get pada objek kontainer. Misalnya, untuk mengambil daftar catatan di`processRecords()`, Anda dapat menggunakan`processRecordsInput.getRecords()`.

Pada versi 2 antarmuka ini (KCL 1.5.0 dan yang lebih baru), input baru berikut tersedia selain input yang disediakan oleh antarmuka asli:

nomor urut awal  
Dalam `InitializationInput` objek yang diteruskan ke `initialize()` operasi, nomor urut awal dari mana catatan akan diberikan ke instance prosesor rekaman. Ini adalah nomor urut yang terakhir diperiksa oleh instance prosesor rekaman yang sebelumnya memproses pecahan yang sama. Ini disediakan jika aplikasi Anda membutuhkan informasi ini. 

nomor urut pos pemeriksaan yang tertunda  
Dalam `InitializationInput` objek yang diteruskan ke `initialize()` operasi, nomor urutan pos pemeriksaan yang tertunda (jika ada) yang tidak dapat dilakukan sebelum instance prosesor rekaman sebelumnya berhenti. 

## Menerapkan pabrik kelas untuk antarmuka IRecord Processor
<a name="kinesis-record-processor-implementation-factory-java"></a>

Anda juga perlu mengimplementasikan pabrik untuk kelas yang mengimplementasikan metode prosesor rekaman. Ketika konsumen Anda membuat instance pekerja, ia meneruskan referensi ke pabrik ini.

Sampel mengimplementasikan kelas pabrik dalam file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` menggunakan antarmuka prosesor rekaman asli. Jika Anda ingin pabrik kelas membuat prosesor rekaman versi 2, gunakan nama paket`com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Buat pekerja
<a name="kcl-java-worker"></a>

Seperti dibahas dalam[Menerapkan metode IRecord Processor](#kinesis-record-processor-implementation-interface-java), ada dua versi antarmuka prosesor rekaman KCL untuk dipilih, yang memengaruhi cara Anda membuat pekerja. Antarmuka prosesor rekaman asli menggunakan struktur kode berikut untuk membuat pekerja:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Dengan versi 2 dari antarmuka prosesor rekaman, Anda dapat menggunakan `Worker.Builder` untuk membuat pekerja tanpa perlu khawatir tentang konstruktor mana yang akan digunakan dan urutan argumen. Antarmuka prosesor rekaman yang diperbarui menggunakan struktur kode berikut untuk membuat pekerja:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-java"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Data konfigurasi untuk pekerja ini kemudian dikonsolidasikan dalam sebuah `KinesisClientLibConfiguration` objek. Objek ini dan referensi ke pabrik kelas untuk `IRecordProcessor` diteruskan dalam panggilan yang membuat instance pekerja. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri menggunakan file properti Java (lihat`AmazonKinesisApplicationSample.java`).

### Nama aplikasi
<a name="configuration-property-application-name"></a>

KCL memerlukan nama aplikasi yang unik di seluruh aplikasi Anda, dan di seluruh tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-cred-java"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Misalnya, jika Anda menjalankan konsumen pada instans EC2, sebaiknya Anda meluncurkan instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil bagi konsumen yang berjalan pada instans EC2.

Aplikasi sampel pertama kali mencoba untuk mengambil kredenal IAM dari metadata instance: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Jika aplikasi sampel tidak dapat memperoleh kredensil dari metadata instance, aplikasi tersebut mencoba mengambil kredensil dari file properti:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

*Untuk informasi selengkapnya tentang metadata instans, lihat [Metadata Instans di Panduan Pengguna](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) Amazon EC2.*

### Gunakan ID pekerja untuk beberapa instance
<a name="kinesis-record-processor-workerid-java"></a>

Kode inisialisasi sampel membuat ID untuk pekerja`workerId`, menggunakan nama komputer lokal dan menambahkan pengidentifikasi unik global seperti yang ditunjukkan dalam cuplikan kode berikut. Pendekatan ini mendukung skenario beberapa contoh aplikasi konsumen yang berjalan pada satu komputer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrasi ke Versi 2 dari antarmuka prosesor rekaman
<a name="kcl-java-v2-migration"></a>

Jika Anda ingin memigrasikan kode yang menggunakan antarmuka asli, selain langkah-langkah yang dijelaskan sebelumnya, langkah-langkah berikut diperlukan:

1. Ubah kelas prosesor rekaman Anda untuk mengimpor antarmuka prosesor rekaman versi 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Ubah referensi ke input untuk menggunakan `get` metode pada objek kontainer. Misalnya, dalam `shutdown()` operasi, ubah "`checkpointer`" menjadi "`shutdownInput.getCheckpointer()`”.

1. Ubah kelas pabrik prosesor rekaman Anda untuk mengimpor antarmuka pabrik prosesor rekaman versi 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Ubah konstruksi pekerja yang akan digunakan`Worker.Builder`. Contoh:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Mengembangkan konsumen Perpustakaan Klien Kinesis di Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Node.js.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Node.js dan menulis aplikasi konsumen Anda sepenuhnya di Node.js, Anda masih memerlukan Java diinstal pada sistem Anda karena. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Node.js KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client-nodejs)Node.js).

**Unduhan Kode Sampel**

Ada dua contoh kode yang tersedia untuk KCL di Node.js:
+ [sampel dasar](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Digunakan di bagian berikut untuk menggambarkan dasar-dasar membangun aplikasi konsumen KCL di Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Sedikit lebih maju dan menggunakan skenario dunia nyata, setelah Anda membiasakan diri dengan kode sampel dasar. Sampel ini tidak dibahas di sini tetapi memiliki file README dengan informasi lebih lanjut.

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Node.js:

**Topics**
+ [

## Implementasikan prosesor rekaman
](#kinesis-record-processor-implementation-interface-nodejs)
+ [

## Ubah properti konfigurasi
](#kinesis-record-processor-initialization-nodejs)

## Implementasikan prosesor rekaman
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

Konsumen paling sederhana yang mungkin menggunakan KCL untuk Node.js harus mengimplementasikan `recordProcessor` fungsi, yang pada gilirannya berisi fungsi`initialize`,`processRecords`, dan`shutdown`. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**menginisialisasi**  
KCL memanggil `initialize` fungsi ketika prosesor rekaman dimulai. Prosesor rekaman ini hanya memproses ID pecahan yang diteruskan sebagai`initializeInput.shardId`, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data *Streams memiliki setidaknya* sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL memanggil fungsi ini dengan input yang berisi daftar catatan data dari pecahan yang ditentukan ke fungsi tersebut`initialize`. Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi, yang dapat digunakan pekerja saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `record`Kamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:

```
record.data
record.sequenceNumber
record.partitionKey
```

Perhatikan bahwa data tersebut dikodekan oleh Base64.

Dalam sampel dasar, fungsi `processRecords` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini dengan `checkpointer` objek yang dilewatkan sebagai`processRecordsInput.checkpointer`. Prosesor rekaman Anda memanggil `checkpointer.checkpoint` fungsi untuk memberi tahu KCL seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini saat Anda memulai ulang pemrosesan pecahan sehingga berlanjut dari catatan olahan terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak meneruskan nomor urut ke `checkpoint` fungsi, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` **hanya** setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`processRecords`. Prosesor dapat, misalnya, memanggil setiap panggilan ketiga, atau beberapa peristiwa `checkpoint` di luar prosesor rekaman Anda, seperti verification/validation layanan khusus yang telah Anda terapkan. 

Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Aplikasi sampel dasar menunjukkan panggilan sesederhana mungkin ke `checkpointer.checkpoint` fungsi tersebut. Anda dapat menambahkan logika checkpointing lain yang Anda butuhkan untuk konsumen Anda pada titik ini dalam fungsi.

**penonaktifan**  
KCL memanggil `shutdown` fungsi baik saat pemrosesan berakhir (`shutdownInput.reason`is`TERMINATE`) atau pekerja tidak lagi merespons (`shutdownInput.reason`is`ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `shutdownInput.checkpointer` objek ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, Anda harus memastikan bahwa prosesor rekaman telah selesai memproses catatan data apa pun, dan kemudian memanggil `checkpoint` fungsi pada antarmuka ini.

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-nodejs"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat `sample.properties` di sampel dasar).

### Nama aplikasi
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-credentials-nodejs"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. `sample.properties`File harus membuat kredensyal Anda tersedia untuk salah satu penyedia kredensyal dalam rantai penyedia kredensi [default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Jika Anda menjalankan konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

Contoh berikut mengkonfigurasi KCL untuk memproses aliran data Kinesis bernama `kclnodejssample` menggunakan prosesor rekaman yang disediakan di: `sample_kcl_app.js`

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Mengembangkan konsumen Perpustakaan Klien Kinesis di .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas .NET.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk.NET dan menulis aplikasi konsumen Anda sepenuhnya di .NET, Anda masih perlu Java diinstal pada sistem Anda karena itu MultiLangDaemon. Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh .NET KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Untuk mengunduh kode sampel untuk aplikasi konsumen.NET KCL, buka halaman [proyek konsumen sampel KCL untuk .NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) di. GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di .NET:

**Topics**
+ [

## Menerapkan metode kelas IRecord Processor
](#kinesis-record-processor-implementation-interface-dotnet)
+ [

## Ubah properti konfigurasi
](#kinesis-record-processor-initialization-dotnet)

## Menerapkan metode kelas IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

Konsumen harus menerapkan metode berikut untuk`IRecordProcessor`. Konsumen sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat `SampleRecordProcessor` kelas di`SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Inisialisasi**  
KCL memanggil metode ini ketika prosesor rekaman dipakai, melewati ID pecahan tertentu dalam parameter (). `input` `input.ShardId` Prosesor rekaman ini hanya memproses pecahan ini, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data *Streams memiliki setidaknya* sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL memanggil metode ini, melewati daftar catatan data dalam `input` parameter (`input.Records`) dari pecahan yang ditentukan oleh metode. `Initialize` Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `Record`Kelas mengekspos berikut ini untuk mengakses data catatan, nomor urut, dan kunci partisi:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Dalam sampel, metode ini `ProcessRecordsWithRetries` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan meneruskan `Checkpointer` objek ke `ProcessRecords` (`input.Checkpointer`). Prosesor rekaman memanggil `Checkpointer.Checkpoint` metode untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `Checkpointer.Checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan untuk `Checkpointer.Checkpoint` menandakan bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `Checkpointer.Checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `Checkpointer.Checkpoint` setiap panggilan ke`ProcessRecords`. Prosesor dapat, misalnya, memanggil `Checkpointer.Checkpoint` setiap panggilan ketiga atau keempat. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`Checkpointer.Checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa catatan telah diproses hanya hingga catatan itu.

Dalam sampel, metode pribadi `Checkpoint(Checkpointer checkpointer)` menunjukkan cara memanggil `Checkpointer.Checkpoint` metode menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL untuk.NET menangani pengecualian secara berbeda dari pustaka bahasa KCL lainnya karena tidak menangani pengecualian apa pun yang muncul dari pemrosesan catatan data. Setiap pengecualian yang tidak tertangkap dari kode pengguna akan merusak program.

**Matikan**  
KCL memanggil `Shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (nilai shutdown `input.Reason` adalah). `ZOMBIE`

```
public void Shutdown(ShutdownInput input)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `Checkpointer` objek ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-dotnet"></a>

Konsumen sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat`SampleConsumer/kcl.properties`).

### Nama aplikasi
<a name="modify-kinesis-record-processor-application-name"></a>

KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-creds-dotnet"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. [Sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) [harus membuat kredensil Anda tersedia untuk salah satu penyedia kredensional dalam rantai penyedia kredensi default.](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) Jika Anda menjalankan aplikasi konsumen pada instans EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil bagi konsumen yang berjalan pada instans EC2.

File properti sampel mengonfigurasi KCL untuk memproses aliran data Kinesis yang disebut “kata-kata” menggunakan prosesor rekaman yang disertakan. `AmazonKinesisSampleConsumer.cs` 

# Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Python.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Python KCL dari GitHub, pergi ke [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Untuk mengunduh kode sampel untuk aplikasi konsumen Python KCL, buka halaman proyek sampel [KCL untuk Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples). GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL dengan Python:

**Topics**
+ [

## Menerapkan metode RecordProcessor kelas
](#kinesis-record-processor-implementation-interface-py)
+ [

## Ubah properti konfigurasi
](#kinesis-record-processor-initialization-py)

## Menerapkan metode RecordProcessor kelas
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess`Kelas harus memperluas `RecordProcessorBase` untuk mengimplementasikan metode berikut. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**menginisialisasi**  
KCL memanggil `initialize` metode ketika prosesor rekaman dipakai, melewati ID pecahan tertentu sebagai parameter. Prosesor rekaman ini hanya memproses pecahan ini, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data *Streams memiliki setidaknya* sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL memanggil metode ini, melewati daftar catatan data dari pecahan yang ditentukan oleh metode. `initialize` Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `record`Kamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Perhatikan bahwa data tersebut dikodekan oleh Base64.

Dalam sampel, metode ini `process_records` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan mengirimkan `Checkpointer` objek ke`process_records`. Prosesor rekaman memanggil `checkpoint` metode pada objek ini untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`process_records`. Prosesor dapat, misalnya, memanggil `checkpoint` setiap panggilan ketiga. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Dalam sampel, metode pribadi `checkpoint` menunjukkan cara memanggil `Checkpointer.checkpoint` metode menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL bergantung pada `process_records` untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Jika pengecualian dilemparkan`process_records`, KCL melompati catatan data yang diteruskan `process_records` sebelum pengecualian. Artinya, catatan ini tidak dikirim kembali ke prosesor rekaman yang melemparkan pengecualian atau ke prosesor rekaman lainnya di konsumen.

**penonaktifan**  
 KCL memanggil `shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (shutdown `reason` adalah). `ZOMBIE`

```
def shutdown(self, checkpointer, reason)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

 KCL juga meneruskan `Checkpointer` objek ke`shutdown`. Jika shutdown `reason``TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-py"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat`sample.properties`).

### Nama aplikasi
<a name="kinesis-record-processor-application-name-py"></a>

KCL memerlukan nama aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-creds-py"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. [Sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) [harus membuat kredensil Anda tersedia untuk salah satu penyedia kredensional dalam rantai penyedia kredensi default.](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) Jika Anda menjalankan aplikasi konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

File properti sampel mengonfigurasi KCL untuk memproses aliran data Kinesis yang disebut “kata-kata” menggunakan prosesor rekaman yang disertakan. `sample_kclpy_app.py` 

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Ruby.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Ruby dan menulis aplikasi konsumen Anda sepenuhnya di Ruby, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Ruby KCL dari GitHub, buka [Perpustakaan Klien Kinesis](https://github.com/awslabs/amazon-kinesis-client-ruby) (Ruby). Untuk mengunduh kode sampel untuk aplikasi konsumen Ruby KCL, buka halaman proyek [sampel KCL untuk Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) di. GitHub

Untuk informasi selengkapnya tentang pustaka dukungan KCL Ruby, lihat Dokumentasi Permata [Ruby KCL](http://www.rubydoc.info/gems/aws-kclrb).

# Kembangkan Konsumen KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Topik ini menunjukkan cara menggunakan versi 2.0 dari Kinesis Client Library (KCL). 

Untuk informasi lebih lanjut tentang KCL, lihat ikhtisar yang disediakan dalam [Mengembangkan Konsumen Menggunakan Perpustakaan Klien Kinesis 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Pilih dari topik berikut tergantung pada opsi yang ingin Anda gunakan.

**Topics**
+ [

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
](kcl2-standard-consumer-java-example.md)
+ [

# Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python
](kcl2-standard-consumer-python-example.md)
+ [

# Kembangkan konsumen fan-out yang disempurnakan dengan KCL 2.x
](building-enhanced-consumers-kcl-retired.md)

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
<a name="kcl2-standard-consumer-java-example"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Kode berikut menunjukkan contoh implementasi di Java dari `ProcessorFactory` dan`RecordProcessor`. Jika Anda ingin memanfaatkan fitur fan-out yang disempurnakan, lihat [Menggunakan Konsumen dengan Enhanced](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html) Fan-Out.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python
<a name="kcl2-standard-consumer-python-example"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Python.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Python KCL dari GitHub, pergi ke [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Untuk mengunduh kode sampel untuk aplikasi konsumen Python KCL, buka halaman proyek sampel [KCL untuk Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples). GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL dengan Python:

**Topics**
+ [

## Menerapkan metode RecordProcessor kelas
](#kinesis-record-processor-implementation-interface-py)
+ [

## Ubah properti konfigurasi
](#kinesis-record-processor-initialization-py)

## Menerapkan metode RecordProcessor kelas
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess`Kelas harus memperluas `RecordProcessorBase` kelas untuk menerapkan metode berikut:

```
initialize
process_records
shutdown_requested
```

Contoh ini menyediakan implementasi yang dapat Anda gunakan sebagai titik awal.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-py"></a>

Sampel memberikan nilai default untuk properti konfigurasi, seperti yang ditunjukkan pada skrip berikut. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Nama aplikasi
<a name="kinesis-record-processor-application-name-py"></a>

KCL memerlukan nama aplikasi yang unik di antara aplikasi Anda dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan di beberapa instance. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Kredensial
<a name="kinesis-record-processor-creds-py"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia [kredensi default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. Jika Anda menjalankan aplikasi konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

# Kembangkan konsumen fan-out yang disempurnakan dengan KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Konsumen yang menggunakan *fan-out yang disempurnakan* di Amazon Kinesis Data Streams dapat menerima catatan dari aliran data dengan throughput khusus hingga 2 MB data per detik per pecahan. Konsumen jenis ini tidak harus bersaing dengan konsumen lain yang menerima data dari aliran. Untuk informasi selengkapnya, lihat [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md).

Anda dapat menggunakan versi 2.0 atau yang lebih baru dari Kinesis Client Library (KCL) untuk mengembangkan aplikasi yang menggunakan fan-out yang disempurnakan untuk menerima data dari aliran. KCL secara otomatis berlangganan aplikasi Anda ke semua pecahan aliran, dan memastikan bahwa aplikasi konsumen Anda dapat membaca dengan nilai throughput 2 per pecahan. MB/sec Jika Anda ingin menggunakan KCL tanpa mengaktifkan fan-out yang disempurnakan, lihat [Mengembangkan Konsumen Menggunakan Kinesis Client](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html) Library 2.0.

**Topics**
+ [

# Kembangkan konsumen fan-out yang disempurnakan menggunakan KCL 2.x di Jawa
](building-enhanced-consumers-kcl-java.md)

# Kembangkan konsumen fan-out yang disempurnakan menggunakan KCL 2.x di Jawa
<a name="building-enhanced-consumers-kcl-java"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) versi 2.0 atau yang lebih baru untuk mengembangkan aplikasi di Amazon Kinesis Data Streams untuk menerima data dari stream menggunakan fan-out yang disempurnakan. Kode berikut menunjukkan contoh implementasi di Java dari `ProcessorFactory` dan`RecordProcessor`.

Disarankan agar Anda menggunakan `KinesisClientUtil` untuk membuat `KinesisAsyncClient` dan mengkonfigurasi `maxConcurrency``KinesisAsyncClient`.

**penting**  
Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi `KinesisAsyncClient` untuk memiliki cukup `maxConcurrency` tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan. `KinesisAsyncClient`

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```