

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# プロデューサーを実装する
<a name="tutorial-stock-data-kplkcl-producer"></a>



[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) のアプリケーションでは、株式市場取引をモニタリングする実際のシナリオが使用されます。次の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングすることができます。

ソースコードを参照し、次の情報を確認してください。

**StockTrade クラス**  
株式取引は、`StockTrade` クラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。

**ストリームレコード**  
ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する `StockTrade` インスタンスの 1 つを表しています。例:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator クラス**  
`StockTradeGenerator` には、呼び出されるたびにランダムに生成された新しい株式取引を返す、`getRandomTrade()` と呼ばれるメソッドが含まれています。このクラスは、既に実装されています。

**StockTradesWriter クラス**  
プロデューサーの `main` メソッドである `StockTradesWriter` は、継続的にランダム取引を取得し、以下のタスクを実行してそれらを Kinesis Data Streams に送信します。  

1. ストリーム名とリージョン名を入力として読み取ります。

1. `AmazonKinesisClientBuilder` を作成します。

1. クライアントビルダーを使用してリージョン、認証情報、およびクライアント構成を設定します。

1. クライアントビルダーを使用して `AmazonKinesis` クライアントを構成します。

1. ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。

1. 連続ループで、`StockTradeGenerator.getRandomTrade()` メソッドに続き `sendStockTrade` メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。
`sendStockTrade` クラスの `StockTradesWriter` メソッドには次のコードがあります。  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

次のコードの詳細を参照してください。
+ `PutRecord` API はバイト配列を想定するため、`trade` を JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 取引を送信する前に、新しい `PutRecordRequest` インスタンス (この場合、`putRecord` と呼ばれる) を作成する必要があります。

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  各 `PutRecord` の呼び出しには、ストリーム名、パーティションキー、およびデータ BLOB が必要です。次のコードによって、`putRecord` メソッドを使用して、これらのフィールドを `setXxxx()` オブジェクトに追加します。

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  この例では、株式チケットをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、[ストリームにデータを追加する](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)を参照してください。

  次に、`putRecord` をクライアントに送信 (`put` オペレーション) することができます。

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  `put` オペレーションの前後に try/catch ブロックを追加します。

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の `put` オペレーションが失敗することがあるためです。データが失われることがないように、単純な再試行を使用するなど、`put` オペレーションの再試行ポリシーを慎重に検討することをお勧めします。
+ ステータスのログ記録は有益ですが、オプションです。

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能 `PutRecord` が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、`PutRecords` のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、[ストリームにデータを追加する](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)を参照してください。

**プロデューサーを実行するには**

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradeWriter` クラスを実行します。

   ```
   StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

次のような出力が表示されます:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Kinesis Data Streams によって株式取引ストリームが取り込まれます。

## 次の手順
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[コンシューマーを実装する](tutorial-stock-data-kplkcl-consumer.md)