

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Tutorial: Memproses data stok real-time menggunakan KPL dan KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

Skenario untuk tutorial ini melibatkan menelan perdagangan saham ke dalam aliran data dan menulis aplikasi Amazon Kinesis Data Streams dasar yang melakukan perhitungan pada aliran. Anda akan belajar cara mengirim aliran catatan ke Kinesis Data Streams dan mengimplementasikan aplikasi yang mengkonsumsi dan memproses catatan dalam waktu dekat.

**penting**  
Setelah Anda membuat stream, akun Anda akan dikenakan biaya nominal untuk penggunaan Kinesis Data Streams karena Kinesis Data Streams tidak memenuhi syarat untuk Tingkat Gratis. AWS Setelah aplikasi konsumen dimulai, itu juga menimbulkan biaya nominal untuk penggunaan Amazon DynamoDB. Aplikasi konsumen menggunakan DynamoDB untuk melacak status pemrosesan. Setelah Anda selesai dengan aplikasi ini, hapus AWS sumber daya Anda untuk menghentikan biaya. Untuk informasi selengkapnya, lihat [Pembersihan sumber daya](tutorial-stock-data-kplkcl2-finish.md).

Kode tersebut tidak mengakses data pasar saham yang sebenarnya, melainkan mensimulasikan aliran perdagangan saham. Ia melakukannya dengan menggunakan generator perdagangan saham acak yang memiliki titik awal data pasar riil untuk 25 saham teratas berdasarkan kapitalisasi pasar pada Februari 2015. Jika Anda memiliki akses ke aliran perdagangan saham secara real-time, Anda mungkin tertarik untuk mendapatkan statistik yang berguna dan tepat waktu dari aliran itu. Misalnya, Anda mungkin ingin melakukan analisis jendela geser di mana Anda menentukan saham paling populer yang dibeli dalam 5 menit terakhir. Atau Anda mungkin ingin pemberitahuan setiap kali ada pesanan jual yang terlalu besar (yaitu, memiliki terlalu banyak saham). Anda dapat memperluas kode dalam seri ini untuk menyediakan fungsionalitas tersebut.

Anda dapat mengerjakan langkah-langkah dalam tutorial ini di komputer desktop atau laptop Anda dan menjalankan kode produsen dan konsumen pada mesin yang sama atau platform apa pun yang mendukung persyaratan yang ditentukan.

Contoh yang ditampilkan menggunakan Wilayah Barat AS (Oregon), tetapi mereka bekerja di salah satu [AWS Wilayah](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region) yang mendukung Kinesis Data Streams.

**Topics**
+ [

# Prasyarat lengkap
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Buat aliran data
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Buat kebijakan dan pengguna IAM
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Unduh dan buat kodenya
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implementasikan produsen
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implementasikan konsumen
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Opsional) Memperluas konsumen
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Pembersihan sumber daya
](tutorial-stock-data-kplkcl2-finish.md)

# Prasyarat lengkap
<a name="tutorial-stock-data-kplkcl2-begin"></a>

Anda harus memenuhi persyaratan berikut untuk menyelesaikan tutorial ini:

## Membuat dan menggunakan Akun Amazon Web Services
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Sebelum Anda mulai, pastikan Anda terbiasa dengan konsep yang dibahas[Terminologi dan konsep Amazon Kinesis Data Streams](key-concepts.md), terutama dengan aliran, pecahan, produsen, dan konsumen. Hal ini juga membantu untuk menyelesaikan langkah-langkah dalam panduan berikut:[Tutorial: Instal dan konfigurasikan AWS CLI untuk Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Anda harus memiliki AWS akun dan browser web untuk mengakses file Konsol Manajemen AWS.

Untuk akses konsol, gunakan nama pengguna dan kata sandi IAM Anda untuk masuk ke halaman masuk [Konsol Manajemen AWS](https://console.aws.amazon.com/console/home)dari IAM. *Untuk informasi tentang kredenal AWS keamanan, termasuk akses terprogram dan alternatif untuk kredensial jangka panjang, lihat kredenal [AWS keamanan di Panduan Pengguna IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html).* Untuk detail tentang masuk ke Anda Akun AWS, lihat [Cara masuk AWS di](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) *Panduan AWS Sign-In Pengguna*.

Untuk informasi selengkapnya tentang IAM dan petunjuk penyiapan kunci keamanan, lihat [Membuat Pengguna IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Memenuhi persyaratan perangkat lunak sistem
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

Sistem yang Anda gunakan untuk menjalankan aplikasi harus memiliki Java 7 atau lebih tinggi diinstal. Untuk mengunduh dan menginstal Java Development Kit (JDK) terbaru, buka situs instalasi [Java SE Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html).

Anda membutuhkan [AWS SDK untuk Java](https://aws.amazon.com/sdk-for-java/)versi terbaru. 

[Aplikasi konsumen memerlukan Kinesis Client Library (KCL) versi 2.2.9 atau lebih tinggi, yang dapat Anda peroleh dari di /tree/master. GitHub https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Buat aliran data](tutorial-stock-data-kplkcl2-create-stream.md)

# Buat aliran data
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

Pertama, Anda harus membuat aliran data yang akan Anda gunakan dalam langkah-langkah selanjutnya dari tutorial ini.

**Untuk membuat aliran**

1. [Masuk ke Konsol Manajemen AWS dan buka konsol Kinesis di /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Pilih **Data Streams** (Aliran Data) di panel navigasi.

1. Di bilah navigasi, perluas pemilih Wilayah dan pilih Wilayah.

1. Pilih **Buat aliran Kinesis**.

1. Masukkan nama untuk aliran data Anda (misalnya,**StockTradeStream**).

1. Masukkan **1** untuk jumlah pecahan, tetapi tetap **Perkirakan jumlah pecahan yang Anda perlukan** untuk diciutkan.

1. Pilih **Buat aliran Kinesis**.

Pada halaman daftar **aliran Kinesis**, status aliran Anda muncul `CREATING` saat aliran sedang dibuat. Saat aliran siap digunakan, statusnya berubah menjadi`ACTIVE`. 

Jika Anda memilih nama aliran Anda, di halaman yang muncul, tab **Detail** menampilkan ringkasan konfigurasi aliran data Anda. Bagian **Pemantauan** menampilkan informasi pemantauan untuk aliran.

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Buat kebijakan dan pengguna IAM](tutorial-stock-data-kplkcl2-iam.md)

# Buat kebijakan dan pengguna IAM
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Praktik terbaik keamanan untuk AWS mendikte penggunaan izin halus untuk mengontrol akses ke sumber daya yang berbeda. AWS Identity and Access Management (IAM) memungkinkan Anda untuk mengelola pengguna dan izin pengguna di. AWS[Kebijakan IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) secara eksplisit mencantumkan tindakan yang diizinkan dan sumber daya tempat tindakan tersebut berlaku.

Berikut ini adalah izin minimum yang umumnya diperlukan untuk produsen dan konsumen Kinesis Data Streams.


**Produser**  

| Tindakan | Sumber daya | Tujuan | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Aliran data Kinesis | Sebelum mencoba membaca catatan, konsumen memeriksa apakah aliran data ada, apakah aktif, dan apakah pecahan terkandung dalam aliran data. | 
| SubscribeToShard, RegisterStreamConsumer | Aliran data Kinesis | Berlangganan dan mendaftarkan konsumen ke pecahan. | 
| PutRecord, PutRecords | Aliran data Kinesis | Menulis catatan ke Kinesis Data Streams. | 


**Konsumen**  

| **Tindakan** | **Sumber Daya** | **Tujuan** | 
| --- | --- | --- | 
| DescribeStream | Aliran data Kinesis | Sebelum mencoba membaca catatan, konsumen memeriksa apakah aliran data ada, apakah aktif, dan apakah pecahan terkandung dalam aliran data. | 
| GetRecords, GetShardIterator  | Aliran data Kinesis | Membaca catatan dari pecahan. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Tabel Amazon DynamoDB | Jika konsumen dikembangkan menggunakan Kinesis Client Library (KCL) (baik versi 1.x atau 2.x), ia memerlukan izin ke tabel DynamoDB untuk melacak status pemrosesan aplikasi. | 
| DeleteItem | Tabel Amazon DynamoDB | Untuk saat konsumen melakukan split/merge operasi pada pecahan Data Streams Kinesis. | 
| PutMetricData |  CloudWatch Log Amazon | KCL juga mengunggah metrik ke CloudWatch, yang berguna untuk memantau aplikasi. | 

Untuk tutorial ini, Anda akan membuat kebijakan IAM tunggal yang memberikan semua izin sebelumnya. Dalam produksi, Anda mungkin ingin membuat dua kebijakan, satu untuk produsen dan satu untuk konsumen.

**Untuk membuat kebijakan IAM**

1. Temukan Nama Sumber Daya Amazon (ARN) untuk aliran data baru yang Anda buat di langkah sebelumnya. **Anda dapat menemukan ARN ini terdaftar sebagai Stream **ARN** di bagian atas tab Detail.** Format ARN adalah sebagai berikut:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
Kode AWS Wilayah; misalnya,`us-west-2`. Untuk informasi selengkapnya, lihat [Konsep Wilayah dan Zona Ketersediaan](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*akun*  
ID AWS akun, seperti yang ditunjukkan pada [Pengaturan Akun](https://console.aws.amazon.com/billing/home?#/account).  
*name*  
Nama aliran data yang Anda buat pada langkah sebelumnya, yaitu. `StockTradeStream`

1. Tentukan ARN untuk tabel DynamoDB yang akan digunakan oleh konsumen (dan akan dibuat oleh contoh konsumen pertama). Itu harus dalam format berikut:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   Region dan ID akun identik dengan nilai dalam ARN aliran data yang Anda gunakan untuk tutorial ini, tetapi *namanya adalah nama* tabel DynamoDB yang dibuat dan digunakan oleh aplikasi konsumen. KCL menggunakan nama aplikasi sebagai nama tabel. Pada langkah ini, gunakan `StockTradesProcessor` untuk nama tabel DynamoDB, karena itu adalah nama aplikasi yang digunakan dalam langkah-langkah selanjutnya dalam tutorial ini.

1. Di konsol IAM, di **Kebijakan** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)), pilih **Buat kebijakan**. Jika ini adalah pertama kalinya Anda bekerja dengan kebijakan IAM, pilih **Mulai**, **Buat Kebijakan**.

1. Pilih **Pilih** di samping **Policy Generator**.

1. Pilih **Amazon Kinesis sebagai layanannya**. AWS 

1. Pilih`DescribeStream`,`GetShardIterator`,`GetRecords`,`PutRecord`, dan `PutRecords` sebagai tindakan yang diizinkan.

1. Masukkan ARN dari aliran data yang Anda gunakan dalam tutorial ini.

1. Gunakan **Tambah Pernyataan** untuk masing-masing hal berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   Tanda bintang (`*`) yang digunakan saat menentukan ARN tidak diperlukan. Dalam hal ini, itu karena tidak ada sumber daya khusus CloudWatch di mana `PutMetricData` tindakan dipanggil.

1. Pilih **Langkah Selanjutnya.**

1. Ubah **Nama Kebijakan** menjadi`StockTradeStreamPolicy`, tinjau kode, dan pilih **Buat Kebijakan**.

Dokumen kebijakan yang dihasilkan akan terlihat seperti ini:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Untuk membuat pengguna IAM**

1. Buka konsol IAM di [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Pada halaman **Pengguna**, pilih **Tambah pengguna**.

1. Untuk **Nama pengguna**, ketik `StockTradeStreamUser`.

1. Untuk **jenis Access**, pilih **Akses terprogram**, lalu pilih **Berikutnya: Izin**.

1. Pilih **Lampirkan kebijakan yang sudah ada secara langsung**.

1. Cari berdasarkan nama untuk kebijakan yang Anda buat dalam prosedur sebelumnya (. `StockTradeStreamPolicy` Pilih kotak di sebelah kiri nama kebijakan, lalu pilih **Berikutnya: Tinjau**.

1. Tinjau detail dan ringkasan, lalu pilih **Buat pengguna**.

1. Salin **ID kunci Akses**, dan simpan secara pribadi. Di bawah **Kunci akses rahasia**, pilih **Tampilkan**, dan simpan kunci itu secara pribadi juga.

1. Rekatkan akses dan kunci rahasia ke file lokal di tempat aman yang hanya dapat Anda akses. Untuk aplikasi ini, buat file bernama ` ~/.aws/credentials` (dengan izin ketat). File harus dalam format berikut:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**Untuk melampirkan kebijakan IAM ke pengguna**

1. Di konsol IAM, buka [Kebijakan](https://console.aws.amazon.com/iam/home?#policies) dan pilih **Tindakan Kebijakan**. 

1. Pilih `StockTradeStreamPolicy` dan **Lampirkan**.

1. Pilih `StockTradeStreamUser` dan **Lampirkan Kebijakan**.

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Unduh dan buat kodenya](tutorial-stock-data-kplkcl2-download.md)

# Unduh dan buat kodenya
<a name="tutorial-stock-data-kplkcl2-download"></a>

Topik ini memberikan contoh kode implementasi untuk konsumsi perdagangan saham sampel ke dalam aliran data (*produsen*) dan pemrosesan data ini (*konsumen*).

**Untuk mengunduh dan membangun kode**

1. Unduh kode sumber dari [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub repo ke komputer Anda.

1. Buat proyek di IDE Anda dengan kode sumber, mengikuti struktur direktori yang disediakan.

1. Tambahkan pustaka berikut ke proyek:
   + Perpustakaan Klien Amazon Kinesis (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Pencatatan Apache Commons
   + Jambu Biji (Perpustakaan Google Core Untuk Java)
   + Anotasi Jackson
   + Inti Jackson
   + Databind Jackson
   + Jackson Format Data: CBOR
   + Waktu Joda

1. Tergantung pada IDE Anda, proyek mungkin dibangun secara otomatis. Jika tidak, bangun proyek menggunakan langkah-langkah yang sesuai untuk IDE Anda.

Jika Anda berhasil menyelesaikan langkah-langkah ini, Anda sekarang siap untuk pindah ke bagian berikutnya,[Implementasikan produsen](tutorial-stock-data-kplkcl2-producer.md). 

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementasikan produsen](tutorial-stock-data-kplkcl2-producer.md)Implementasikan produsen](tutorial-stock-data-kplkcl2-producer.md)

# Implementasikan produsen
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Tutorial ini menggunakan skenario dunia nyata pemantauan perdagangan pasar saham. Prinsip-prinsip berikut menjelaskan secara singkat bagaimana skenario ini memetakan ke produsen dan struktur kode pendukungnya.

Lihat [kode sumber](https://github.com/aws-samples/amazon-kinesis-learning ) dan tinjau informasi berikut.

**StockTrade kelas**  
Perdagangan saham individu diwakili oleh contoh StockTrade kelas. Contoh ini berisi atribut seperti simbol ticker, harga, jumlah saham, jenis perdagangan (beli atau jual), dan ID yang secara unik mengidentifikasi perdagangan. Kelas ini diterapkan untuk Anda. 

**Rekam aliran**  
Aliran adalah urutan catatan. Rekaman adalah serialisasi `StockTrade` instance dalam format JSON. Contoh:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator kelas**  
StockTradeGenerator memiliki metode `getRandomTrade()` yang disebut yang mengembalikan perdagangan saham baru yang dihasilkan secara acak setiap kali dipanggil. Kelas ini diterapkan untuk Anda.

**StockTradesWriter kelas**  
`main`Metode produsen, StockTradesWriter terus mengambil perdagangan acak dan kemudian mengirimkannya ke Kinesis Data Streams dengan melakukan tugas-tugas berikut:  

1. Membaca nama aliran data dan nama Wilayah sebagai input.

1. Menggunakan `KinesisAsyncClientBuilder` untuk mengatur Region, kredensial, dan konfigurasi klien. 

1. Memeriksa apakah aliran ada dan aktif (jika tidak, ia keluar dengan kesalahan). 

1. Dalam loop kontinu, panggil `StockTradeGenerator.getRandomTrade()` metode dan kemudian `sendStockTrade` metode untuk mengirim perdagangan ke aliran setiap 100 milidetik. 
`sendStockTrade`Metode `StockTradesWriter` kelas memiliki kode berikut:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Lihat rincian kode berikut:
+ `PutRecord`API mengharapkan array byte, dan Anda harus mengonversi perdagangan ke format JSON. Baris kode tunggal ini melakukan operasi itu: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Sebelum Anda dapat mengirim perdagangan, Anda membuat `PutRecordRequest` instance baru (disebut permintaan dalam kasus ini). Masing-masing `request` membutuhkan nama aliran, kunci partisi, dan gumpalan data. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  Contoh menggunakan ticker saham sebagai kunci partisi, yang memetakan catatan ke pecahan tertentu. Dalam praktiknya, Anda harus memiliki ratusan atau ribuan kunci partisi per pecahan sehingga catatan tersebar merata di seluruh aliran Anda. Untuk informasi selengkapnya tentang cara menambahkan data ke aliran, lihat[Menulis data ke Amazon Kinesis Data Streams](building-producers.md).

  Sekarang `request` siap untuk mengirim ke klien (operasi put): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Pemeriksaan kesalahan dan pencatatan selalu merupakan tambahan yang berguna. Kode ini mencatat kondisi kesalahan: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Tambahkan try/catch blok di sekitar `put` operasi: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Ini karena operasi put Kinesis Data Streams dapat gagal karena kesalahan jaringan, atau karena aliran data mencapai batas throughputnya dan terhambat. Disarankan agar Anda mempertimbangkan dengan cermat kebijakan coba ulang Anda untuk `put` operasi untuk menghindari kehilangan data, seperti menggunakan coba lagi. 
+ Pencatatan status sangat membantu tetapi opsional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Produser yang ditampilkan di sini menggunakan fungsionalitas rekam tunggal Kinesis Data Streams API,. `PutRecord` Dalam prakteknya, jika produsen individu menghasilkan banyak catatan, seringkali lebih efisien untuk menggunakan fungsi beberapa catatan `PutRecords` dan mengirim batch catatan pada suatu waktu. Untuk informasi selengkapnya, lihat [Menulis data ke Amazon Kinesis Data Streams](building-producers.md).

**Untuk menjalankan produser**

1. Verifikasi bahwa kunci akses dan key pair rahasia yang diambil [Buat kebijakan dan pengguna IAM](tutorial-stock-data-kplkcl2-iam.md) disimpan dalam file`~/.aws/credentials`. 

1. Jalankan `StockTradeWriter` kelas dengan argumen berikut:

   ```
   StockTradeStream us-west-2
   ```

   Jika Anda membuat streaming di wilayah selain`us-west-2`, Anda harus menentukan wilayah tersebut di sini.

Anda akan melihat output yang serupa dengan yang berikut:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Perdagangan saham Anda sekarang sedang dicerna oleh Kinesis Data Streams.

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementasikan konsumen](tutorial-stock-data-kplkcl2-consumer.md)

# Implementasikan konsumen
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

Aplikasi konsumen dalam tutorial ini terus memproses perdagangan saham dalam aliran data Anda. Ini kemudian menghasilkan saham paling populer yang dibeli dan dijual setiap menit. Aplikasi ini dibangun di atas Kinesis Client Library (KCL), yang melakukan banyak pekerjaan berat yang umum untuk aplikasi konsumen. Untuk informasi selengkapnya, lihat [Informasi KCL 1.x dan 2.x](shared-throughput-kcl-consumers.md). 

Lihat kode sumber dan tinjau informasi berikut.

**StockTradesProcessor kelas**  
Kelas utama konsumen, disediakan untuk Anda, yang melakukan tugas-tugas berikut:  
+ Membaca aplikasi, aliran data, dan nama Wilayah, diteruskan sebagai argumen.
+ Membuat `KinesisAsyncClient` instance dengan nama Region.
+ Membuat `StockTradeRecordProcessorFactory` instance yang melayani instance`ShardRecordProcessor`, diimplementasikan oleh sebuah `StockTradeRecordProcessor` instance. 
+ Menciptakan sebuah `ConfigsBuilder` instance dengan`KinesisAsyncClient`,`StreamName`,`ApplicationName`, dan `StockTradeRecordProcessorFactory` instance. Ini berguna untuk membuat semua konfigurasi dengan nilai default.
+ Membuat penjadwal KCL (sebelumnya, dalam KCL versi 1.x itu dikenal sebagai pekerja KCL) dengan instance. `ConfigsBuilder` 
+ Scheduler membuat thread baru untuk setiap shard (ditugaskan ke instance konsumen ini), yang terus-menerus melakukan loop untuk membaca catatan dari aliran data. Kemudian memanggil `StockTradeRecordProcessor` instance untuk memproses setiap batch catatan yang diterima. 

**StockTradeRecordProcessor kelas**  
Implementasi `StockTradeRecordProcessor` instance, yang pada gilirannya mengimplementasikan lima metode yang diperlukan:`initialize`,`processRecords`,`leaseLost`,`shardEnded`, dan`shutdownRequested`.   
`shutdownRequested`Metode `initialize` dan digunakan oleh KCL untuk memberi tahu prosesor rekaman kapan harus siap untuk mulai menerima catatan dan kapan harus berhenti menerima catatan, masing-masing, sehingga dapat melakukan tugas pengaturan dan penghentian khusus aplikasi apa pun. `leaseLost`dan `shardEnded` digunakan untuk menerapkan logika apa pun untuk apa yang harus dilakukan ketika sewa hilang atau pemrosesan telah mencapai akhir pecahan. Dalam contoh ini, kami cukup mencatat pesan yang menunjukkan peristiwa ini.   
Kode untuk metode ini disediakan untuk Anda. Pemrosesan utama terjadi dalam `processRecords` metode, yang pada gilirannya digunakan `processRecord` untuk setiap catatan. Metode terakhir ini disediakan sebagai kode kerangka yang sebagian besar kosong untuk Anda terapkan pada langkah berikutnya, di mana dijelaskan secara lebih rinci.   
Yang juga perlu diperhatikan adalah penerapan metode dukungan untuk`processRecord`:`reportStats`, dan`resetStats`, yang kosong dalam kode sumber asli.   
`processRecords`Metode ini diterapkan untuk Anda, dan melakukan langkah-langkah berikut:  
+ Untuk setiap catatan yang masuk, ia `processRecord` memanggilnya. 
+ Jika setidaknya 1 menit telah berlalu sejak laporan terakhir, panggilan`reportStats()`, yang mencetak statistik terbaru, dan kemudian `resetStats()` yang menghapus statistik sehingga interval berikutnya hanya mencakup catatan baru.
+ Menetapkan waktu pelaporan berikutnya.
+ Jika setidaknya 1 menit telah berlalu sejak pos pemeriksaan terakhir, hubungi. `checkpoint()` 
+ Menetapkan waktu checkpointing berikutnya.
Metode ini menggunakan interval 60 detik untuk tingkat pelaporan dan pos pemeriksaan. Untuk informasi selengkapnya tentang checkpointing, lihat [Menggunakan Perpustakaan Klien Kinesis](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats kelas**  
Kelas ini menyediakan retensi data dan pelacakan statistik untuk saham paling populer dari waktu ke waktu. Kode ini disediakan untuk Anda dan berisi metode berikut:  
+ `addStockTrade(StockTrade)`: menyuntikkan yang diberikan `StockTrade` ke dalam statistik yang sedang berjalan.
+ `toString()`: mengembalikan statistik dalam string diformat.
Kelas ini melacak saham paling populer dengan menjaga hitungan berjalan dari jumlah total perdagangan untuk setiap saham dan jumlah maksimum. Ini memperbarui jumlah ini setiap kali perdagangan saham tiba.

Tambahkan kode ke metode `StockTradeRecordProcessor` kelas, seperti yang ditunjukkan pada langkah-langkah berikut. 

**Untuk mengimplementasikan konsumen**

1. Terapkan `processRecord` metode dengan membuat instance `StockTrade` objek berukuran benar dan menambahkan data catatan ke dalamnya, mencatat peringatan jika ada masalah. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Menerapkan `reportStats` metode. Ubah format output agar sesuai dengan preferensi Anda. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Menerapkan `resetStats` metode, yang menciptakan `stockStats` instance baru. 

   ```
   stockStats = new StockStats();
   ```

1. Menerapkan metode berikut yang diperlukan oleh `ShardRecordProcessor` antarmuka:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Untuk menjalankan konsumen**

1. Jalankan produser yang Anda tulis [[Implementasikan produsen](tutorial-stock-data-kplkcl2-producer.md)Implementasikan produsen](tutorial-stock-data-kplkcl2-producer.md) untuk menyuntikkan catatan perdagangan saham simulasi ke aliran Anda.

1. Verifikasi bahwa kunci akses dan key pair rahasia yang diambil sebelumnya (saat membuat pengguna IAM) disimpan dalam file. `~/.aws/credentials` 

1. Jalankan `StockTradesProcessor` kelas dengan argumen berikut:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Perhatikan bahwa jika Anda membuat streaming di Wilayah selain`us-west-2`, Anda harus menentukan Wilayah tersebut di sini.

Setelah satu menit, Anda akan melihat output seperti berikut, disegarkan setiap menit setelahnya:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Opsional) Memperluas konsumen](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Opsional) Memperluas konsumen
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

Bagian opsional ini menunjukkan bagaimana Anda dapat memperluas kode konsumen untuk skenario yang sedikit lebih rumit.

Jika Anda ingin tahu tentang pesanan jual terbesar setiap menit, Anda dapat memodifikasi `StockStats` kelas di tiga tempat untuk mengakomodasi prioritas baru ini.

**Untuk memperluas konsumen**

1. Tambahkan variabel instance baru:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Tambahkan kode berikut ke `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Ubah `toString` metode untuk mencetak informasi tambahan:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Jika Anda menjalankan konsumen sekarang (ingat untuk menjalankan produser juga), Anda akan melihat output yang mirip dengan ini:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Langkah selanjutnya
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Pembersihan sumber daya](tutorial-stock-data-kplkcl2-finish.md)

# Pembersihan sumber daya
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Karena Anda membayar untuk menggunakan aliran data Kinesis, pastikan Anda menghapusnya dan tabel Amazon DynamoDB yang sesuai ketika Anda selesai dengannya. Biaya nominal terjadi pada aliran aktif bahkan ketika Anda tidak mengirim dan mendapatkan catatan. Ini karena aliran aktif menggunakan sumber daya dengan terus “mendengarkan” catatan masuk dan permintaan untuk mendapatkan catatan.

**Untuk menghapus aliran dan tabel**

1. Matikan produsen dan konsumen yang mungkin masih Anda jalankan.

1. [Buka konsol Kinesis di /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Pilih aliran yang Anda buat untuk aplikasi ini (`StockTradeStream`).

1. Pilih **Hapus Stream**.

1. Buka konsol DynamoDB di. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Hapus tabel `StockTradesProcessor`.

## Ringkasan
<a name="tutorial-stock-data-kplkcl2-summary"></a>

Memproses sejumlah besar data dalam waktu dekat tidak memerlukan penulisan kode yang rumit atau mengembangkan infrastruktur yang sangat besar. Ini sama dasarnya dengan menulis logika untuk memproses sejumlah kecil data (seperti menulis`processRecord(Record)`) tetapi menggunakan Kinesis Data Streams untuk skala sehingga berfungsi untuk sejumlah besar data streaming. Anda tidak perlu khawatir tentang bagaimana skala pemrosesan Anda karena Kinesis Data Streams menanganinya untuk Anda. Yang harus Anda lakukan adalah mengirim catatan streaming Anda ke Kinesis Data Streams dan menulis logika untuk memproses setiap rekaman baru yang diterima. 

Berikut adalah beberapa peningkatan potensial untuk aplikasi ini.

**Agregat di semua pecahan**  
Saat ini, Anda mendapatkan statistik yang dihasilkan dari agregasi catatan data yang diterima oleh satu pekerja dari satu pecahan. (Pecahan tidak dapat diproses oleh lebih dari satu pekerja dalam satu aplikasi pada saat yang bersamaan.) Tentu saja, ketika Anda menskalakan dan memiliki lebih dari satu pecahan, Anda mungkin ingin menggabungkan semua pecahan. Anda dapat melakukan ini dengan memiliki arsitektur pipeline di mana output dari setiap pekerja dimasukkan ke aliran lain dengan pecahan tunggal, yang diproses oleh pekerja yang mengumpulkan output dari tahap pertama. Karena data dari tahap pertama terbatas (satu sampel per menit per pecahan), maka akan dengan mudah ditangani oleh satu pecahan.

**Pemrosesan skala**  
Ketika aliran meningkat untuk memiliki banyak pecahan (karena banyak produsen mengirim data), cara untuk menskalakan pemrosesan adalah dengan menambahkan lebih banyak pekerja. Anda dapat menjalankan pekerja di instans Amazon EC2 dan menggunakan grup Auto Scaling.

**Gunakan konektor ke Amazon S3/DynamoDB/Amazon Redshift/Storm**  
Karena aliran terus diproses, outputnya dapat dikirim ke tujuan lain. AWS menyediakan [konektor](https://github.com/awslabs/amazon-kinesis-connectors) untuk mengintegrasikan Kinesis Data Streams AWS dengan layanan lain dan alat pihak ketiga.