

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# プロデューサーを実装する
<a name="tutorial-stock-data-kplkcl2-producer"></a>

このチュートリアルでは、株式市場取引をモニタリングする実際のシナリオを使用しています。以下の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングできます。

[ソースコード](https://github.com/aws-samples/amazon-kinesis-learning )を参照し、以下の情報を確認します。

**StockTrade クラス**  
株式取引は、StockTrade クラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。

**ストリームレコード**  
ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する `StockTrade` インスタンスの 1 つを表しています。例:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator クラス**  
StockTradeGenerator には、呼び出されるたびにランダムに生成された新しい株式取引を返す、`getRandomTrade()` という名前のメソッドが含まれています。このクラスは、既に実装されています。

**StockTradesWriter クラス**  
プロデューサーの `main` メソッドである StockTradesWriter は、継続的にランダム取引を取得し、以下のタスクを実行してその取引を Kinesis Data Streams に送信します。  

1. ストリーム名とリージョン名を入力として読み取ります。

1. `KinesisAsyncClientBuilder` を使用してリージョン、認証情報、クライアント構成を設定します。

1. ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。

1. 連続ループで、`StockTradeGenerator.getRandomTrade()` メソッドに続き `sendStockTrade` メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。
`sendStockTrade` クラスの `StockTradesWriter` メソッドには次のコードがあります。  

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

次のコードの詳細を参照してください。
+ `PutRecord` API はバイト配列を想定するため、その取引を JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 取引を送信する前に、新しい `PutRecordRequest` インスタンス (この場合は request) を作成する必要があります。各 `request` には、ストリーム名、パーティションキー、データ BLOB が必要です。

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  この例では、株式ティッカーをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、[Amazon Kinesis Data Streams にデータを書き込む](building-producers.md)を参照してください。

  次に、`request` をクライアントに送信できます (put オペレーション)。

  ```
     kinesisClient.putRecord(request).get();
  ```
+ エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  `put` オペレーションの前後に try/catch ブロックを追加します。

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の put オペレーションが失敗することがあるためです。データが失われることがないように、再試行を使用するなど、`put` オペレーションの再試行ポリシーを慎重に検討することをお勧めします。
+ ステータスのログ記録は有益ですが、オプションです。

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能 `PutRecord` が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、`PutRecords` のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、[Amazon Kinesis Data Streams にデータを書き込む](building-producers.md)を参照してください。

**プロデューサーを実行するには**

1. [IAM ユーザーとポリシーを作成する](tutorial-stock-data-kplkcl2-iam.md) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradeWriter` クラスを実行します。

   ```
   StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

次のような出力が表示されます:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Kinesis Data Streams によって株式取引が取り込まれます。

## 次の手順
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[コンシューマーを実装する](tutorial-stock-data-kplkcl2-consumer.md)