

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# コンシューマーを実装する


このチュートリアルのコンシューマーアプリケーションは、データストリームの株式取引を継続的に処理します。その後、1 分ごとに売買されている最も人気のある株式を出力します。このアプリケーションは、Kinesis Client Library (KCL) 上に構築されており、コンシューマーアプリケーションに共通する面倒な作業の多くを行います。詳細については、[KCL 1.x および 2.x の情報](shared-throughput-kcl-consumers.md)を参照してください。

ソースコードを参照し、次の情報を確認してください。

**StockTradesProcessor クラス**  
事前に用意されているコンシューマーのメインクラスで、次のタスクを実行します。  
+ 引数として渡されたアプリケーション、データストリーム、リージョン名を読み取ります。
+ リージョン名で `KinesisAsyncClient` インスタンスを作成します。
+ `ShardRecordProcessor` のインスタンスとして機能し、`StockTradeRecordProcessor` インスタンスによって実装される、`StockTradeRecordProcessorFactory` インスタンスを作成します。
+ `KinesisAsyncClient`、`StreamName`、`ApplicationName`、および `StockTradeRecordProcessorFactory` インスタンスを使用して `ConfigsBuilder` インスタンスを作成します。これは、デフォルト値ですべての設定を作成するのに役立ちます。
+ `ConfigsBuilder` インスタンスを使用して KCL スケジューラー (以前は KCL バージョン 1.x では KCL ワーカー) を作成します。
+ このスケジューラーは、(このコンシューマーインスタンスに割り当てられた) 各シャードに新しいスレッドを作成します。これにより、継続的にデータストリームからレコードが読み取られます。次に、`StockTradeRecordProcessor` インスタンスを呼び出して、受信したレコードのバッチを処理します。

**StockTradeRecordProcessor クラス**  
`StockTradeRecordProcessor` インスタンスを実装したら、次は `initialize`、`processRecords`、`leaseLost`、`shardEnded`、`shutdownRequested` の 5 つの必須メソッドを実装します。  
KCL は `initialize` および `shutdownRequested` メソッドを使用して、レコードの受信を開始できるタイミングと、レコードの受信を停止するタイミングをそれぞれレコードプロセッサに通知し、アプリケーション固有の設定および終了タスクを実行できるようにします。`leaseLost`および `shardEnded` は、リースが失われたとき、または処理がシャードの終わりに達したときの動作ロジックを実装するために使用します。この例では、これらのイベントを示すメッセージをログに記録するだけです。  
これらのメソッドのコードを示しています。主な処理は `processRecords` メソッドで行われ、そこでは各レコードの `processRecord` が使用されます。後者のメソッドは、ほとんどの場合、空のスケルトンコードとして提供されます。次のステップでは、これを実装する方法について説明します。詳細については、次のステップを参照してください。  
また、`processRecord` のサポートメソッドである `reportStats` および `resetStats` の実装にも注目してください。これらのメソッドは、元のソースコードでは空になっています。  
`processRecords` メソッドは既に実装されており、次のステップを実行します。  
+ 渡されたレコードごとに `processRecord` を呼び出します。
+ 最後のレポートから 1 分間以上経過した場合は、`reportStats()` を呼び出して最新の統計を出力し、次の間隔に新しいレコードのみ含まれるように `resetStats()` を呼び出して統計を消去します。
+ 次のレポート時間を設定します。
+ 最後のチェックポイントから 1 分間以上経過した場合は、`checkpoint()` を呼び出します。
+ 次のチェックポイント時間を設定します。
このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェックポイントの詳細については、[Kinesis Client Library の使用](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)を参照してください。

**StockStats クラス**  
このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。このコードは、事前に用意されており、次のメソッドが含まれています。  
+ `addStockTrade(StockTrade)`: 指定された `StockTrade` を実行中の統計に取り込みます。
+ `toString()`: 特定の形式の文字列として統計を返します。
このクラスは、各株式の合計取引数と最大取引数を継続的にカウントすることで、最も人気のある株式を追跡します。これらの数は、株式取引を受け取る度に更新されます。

次のステップに示されているコードを `StockTradeRecordProcessor` クラスのメソッドに追加します。

**コンシューマーを実装するには**

1. `processRecord` メソッドを実装するには、サイズの正しい `StockTrade` オブジェクトを開始し、それにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにします。

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. `reportStats` メソッドを実装します。出力形式は必要に応じて変更できます。

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. 新しい `resetStats` インスタンスを作成する `stockStats` メソッドを実装します。

   ```
   stockStats = new StockStats();
   ```

1. `ShardRecordProcessor` インターフェイスに必要な以下のメソッドを実装します。

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**コンシューマーを実行するには**

1. [[プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md) で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに取り込みます。

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradesProcessor` クラスを実行します。

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## 次の手順


[（オプション) コンシューマーを拡張する](tutorial-stock-data-kplkcl2-consumer-extension.md)