

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis Data Streams のチュートリアルの開始方法
<a name="examples"></a>

Amazon Kinesis Data Streams は、Kinesis データストリームからデータを取り込み、消費するためのさまざまなソリューションを提供します。このセクションのチュートリアルは、Amazon Kinesis Data Streams の概念と機能を理解し、ニーズを満たすソリューションを特定する上で役立つように設計されています。

**Topics**
+ [

# チュートリアル: KPL と KCL 2.x を使用して株式データをリアルタイム処理する
](tutorial-stock-data-kplkcl2.md)
+ [

# チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する
](tutorial-stock-data-kplkcl.md)
+ [

# チュートリアル: Amazon Managed Service for Apache Flink を使用してリアルタイムの株式データを分析する
](tutorial-stock-data.md)
+ [

# チュートリアル: Amazon Kinesis Data Streams AWS Lambda で を使用する
](tutorial-stock-data-lambda.md)
+ [

# Amazon Kinesis AWS のストリーミングデータソリューションを使用する
](examples-streaming-solution.md)

# チュートリアル: KPL と KCL 2.x を使用して株式データをリアルタイム処理する
<a name="tutorial-stock-data-kplkcl2"></a>

このチュートリアルのシナリオでは、株式取引をデータストリームに取り込み、ストリーム上で計算を実行する基本的な Amazon Kinesis Data Streams アプリケーションを記述する必要があります。レコードのストリームを Kinesis Data Streams に送信し、ほぼリアルタイムでレコードを消費および処理するアプリケーションを実装する方法を説明します。

**重要**  
ストリームを作成すると、Kinesis Data Streams は AWS 無料利用枠の対象ではないため、Kinesis Data Streams の使用に対して アカウントにわずかな料金が発生します。コンシューマーアプリケーションが起動すると、Amazon DynamoDB の使用に伴う料金がわずかに発生します。コンシューマーアプリケーションでは、処理状態を追跡する際に DynamoDB を使用します。このアプリケーションを終了したら、 AWS リソースを削除して料金が発生しないようにしてください。詳細については、[リソースをクリーンアップする](tutorial-stock-data-kplkcl2-finish.md)を参照してください。

このコードでは、実際の株式市場データにアクセスする代わりに、株式取引のストリームをシミュレートします。シミュレーションには、2015 年 2 月時点における時価総額上位 25 社の株式に関する実際の市場データを基にしたランダム株式取引ジェネレーターが使用されています。リアルタイムの株式取引のストリームにアクセスできたとしたら、そのときに必要としている有益な統計を入手したいと考えるかもしれません。たとえば、スライディングウィンドウ分析を実行して、過去 5 分間に購入された最も人気のある株式を調べたいと思われるかもしれません。または、大規模な売り注文 (膨大な株式が含まれる売り注文) が発生したときに通知を受けたいと思われるかもしれません。このシリーズのコードを拡張して、このような機能を使用することもできます。

このチュートリアルにある手順をデスクトップやノートパソコンで実行し、同じマシンまたは定義された要件を満たす任意のプラットフォームで、プロデューサーおよびコンシューマーのコードのいずれも実行できます。

この例では、米国西部 (オレゴン) リージョンが使用されていますが、[Kinesis Data Streams がサポートされるAWS リージョン](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)であれば、いずれのリージョンでも動作します。

**Topics**
+ [

# 前提条件を満たす
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# データストリームを作成する
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# IAM ユーザーとポリシーを作成する
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# コードをダウンロードしてビルドする
](tutorial-stock-data-kplkcl2-download.md)
+ [

# プロデューサーを実装する
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# コンシューマーを実装する
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# （オプション) コンシューマーを拡張する
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# リソースをクリーンアップする
](tutorial-stock-data-kplkcl2-finish.md)

# 前提条件を満たす
<a name="tutorial-stock-data-kplkcl2-begin"></a>

このチュートリアルを完了するには、以下の要件を満たす必要があります。

## Amazon Web Services アカウントを作成して使用する
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

開始する前に、[Amazon Kinesis Data Streams の用語と概念](key-concepts.md) で説明されている概念、特にストリーム、シャード、プロデューサー、コンシューマーについて理解しておきます。また、ガイド[チュートリアル: AWS CLI for Kinesis Data Streams のインストールと設定](kinesis-tutorial-cli-installation.md)の手順を完了しておくと役立ちます。

にアクセスするには、 AWS アカウントとウェブブラウザが必要です AWS マネジメントコンソール。

コンソールアクセスの場合は、IAM ユーザー名とパスワードを使用して IAM サインインページから [AWS マネジメントコンソール](https://console.aws.amazon.com/console/home) にサインインします。プログラムによるアクセスや長期的な認証情報の代替方法など、 AWS セキュリティ認証情報の詳細については、*IAM ユーザーガイド*の[AWS 「セキュリティ認証情報](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)」を参照してください。へのサインインの詳細については AWS アカウント、 *AWS サインイン ユーザーガイド*の[「 へのサインイン方法 AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)」を参照してください。

IAM とセキュリティキーの設定手順の詳細については、[IAM ユーザーを作成する](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user)を参照してください。

## システムソフトウェア要件を満たす
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

アプリケーションの実行に使用するシステムには、Java 7 以降がインストールされている必要があります。最新の Java Development Kit (JDK) をダウンロードおよびインストールするには、[Oracle 社の Java SE インストールサイト](http://www.oracle.com/technetwork/java/javase/downloads/index.html)を参照してください。

最新バージョンの [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) が必要です。

コンシューマーアプリケーションには、Kinesis Client Library (KCL) バージョン 2.2.9 以上が必要です。これは、[https://github.com/awslabs/amazon-kinesis-client/tree/master](https://github.com/awslabs/amazon-kinesis-client/tree/master) の GitHub から入手できます。

## 次の手順
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[データストリームを作成する](tutorial-stock-data-kplkcl2-create-stream.md)

# データストリームを作成する
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

最初に、このチュートリアルの後の手順で使用するデータストリームを作成する必要があります。

**ストリームを作成するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

1. ナビゲーションペインで、[**データストリーム**] を選択します。

1. ナビゲーションバーで、リージョンセレクターを展開し、リージョンを選択します。

1. [**Kinesis ストリームの作成**] を選択します。

1. ストリームの名前 (**StockTradeStream** など) を入力します。

1. シャードカウントは **1** と入力しますが、[**必要なシャードカウントの予想**] は折りたたんだままにします。

1. [**Kinesis ストリームの作成**] を選択します。

[**Kinesis streams**] リストページで、作成中のストリームのステータスは `CREATING` になります。ストリームを使用する準備ができると、ステータスは `ACTIVE` に変わります。

ストリームの名前を選択すると、表示されるページの [**Details (詳細)**] タブにデータストリーム設定の概要が表示されます。[**モニタリング**] セクションには、ストリームのモニタリング情報が表示されます。

## 次の手順
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[IAM ユーザーとポリシーを作成する](tutorial-stock-data-kplkcl2-iam.md)

# IAM ユーザーとポリシーを作成する
<a name="tutorial-stock-data-kplkcl2-iam"></a>

のセキュリティのベストプラクティス AWS により、きめ細かなアクセス許可を使用してさまざまなリソースへのアクセスを制御できます。 AWS Identity and Access Management (IAM) を使用すると、 でユーザーとユーザーのアクセス許可を管理できます AWS。[IAM ポリシー](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html)は、許可されるアクションとそのアクションが適用されるリソースを明示的にリストアップします。

一般的に、Kinesis Data Streams プロデューサーおよびコンシューマーには、次の最小許可が必要になります。


**プロデューサー**  

| アクション | [リソース]  | 目的 | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis Data Streams | レコードを読み取る前に、コンシューマーは、データストリームが存在すること、アクティブであること、シャードを含んでいることを確認します。 | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis Data Streams | コンシューマーをシャードにサブスクライブして登録します。 | 
| PutRecord, PutRecords | Kinesis Data Streams | Kinesis Data Streams にレコードを書き込みます。 | 


**コンシューマー**  

| **アクション** | **[リソース]**  | **目的** | 
| --- | --- | --- | 
| DescribeStream | Kinesis Data Streams | レコードを読み取る前に、コンシューマーは、データストリームが存在すること、アクティブであること、シャードを含んでいることを確認します。 | 
| GetRecords, GetShardIterator  | Kinesis Data Streams | シャードからレコードを読み取ります。 | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB テーブル | Kinesis クライアントライブラリ (KCL) (バージョン 1.x または 2.x) を使用してコンシューマーが開発されている場合は、アプリケーションの処理状態を追跡するときに DynamoDB テーブルにアクセスする必要があります。 | 
| DeleteItem | Amazon DynamoDB テーブル | コンシューマーが Kinesis Data Streams シャードで分割と結合のオペレーションを実行する場合。 | 
| PutMetricData | Amazon CloudWatch Logs | また、KCL は、アプリケーションをモニタリングするのに便利なメトリクスも CloudWatch にアップロードします。 | 

このチュートリアルでは、上記のすべての許可を付与する 1 つの IAM ポリシーを作成します。本番環境では、プロデューサー用とコンシューマー用の 2 つのポリシーを作成してもかまいません。

**IAM ポリシーを作成するには**

1. 上記の手順で作成した新しいデータストリームの Amazon リソースネーム (ARN) を見つけます。この ARN は、 [**ストリーム ARN**] として [**詳細**]タブの上部に表示されます。ARN 形式 は次のとおりです。

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*リージョン*  
 AWS リージョンコード。例: `us-west-2`。詳細については、[リージョンとアベイラビリティーゾーンの概念](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones)を参照してください。  
*アカウント*  
 AWS アカウント[設定](https://console.aws.amazon.com/billing/home?#/account)に示されているアカウント ID。  
*名前*  
上記のステップで作成したデータストリームの名前 (`StockTradeStream`)。

1. コンシューマーによって使用される (および最初のコンシューマーインスタンスによって作成される) DynamoDB テーブルの ARN を決定します。次のような形式になります。

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   リージョンとアカウント ID は、このチュートリアルで使用しているデータストリームの ARN の値と同じですが、*name* は、コンシューマーアプリケーションによって作成および使用される DynamoDB テーブルの名前です。KCL では、アプリケーション名をテーブル名として使用します。このステップでは、DynamoDB テーブル名に `StockTradesProcessor` を使用します。これは、このチュートリアルの後の手順で使用するアプリケーション名であるためです。

1. IAM コンソールの**ポリシー** ([https://console.aws.amazon.com/iam/home\$1policies](https://console.aws.amazon.com/iam/home#policies)) で、[**ポリシーの作成**] を選択します。IAM ポリシーを初めて扱う場合には、[**今すぐ始める**]、[**ポリシーの作成**] を選択します。

1. [**ポリシージェネレーター**] の横の [**選択**] を選択します。

1.  AWS サービスとして **Amazon Kinesis** を選択します。

1. 許可されるアクションとして、`DescribeStream`、`GetShardIterator`、`GetRecords`、`PutRecord`、および `PutRecords` を選択します。

1. このチュートリアルで使用するデータストリームの ARN を入力します。

1. 以下の各項目について、[**ステートメントを追加**] を使用します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   ARN を指定するときに使用されるアスタリスク (`*`) は必要ありません。`PutMetricData` アクションが呼び出される特定のリソースが CloudWatch に存在しない場合などがこれに該当します。

1. ［**Next Step**］ (次のステップ) をクリックします。

1. [**ポリシー名**] を `StockTradeStreamPolicy` に変更し、コードを確認して、[**ポリシーの作成**] を選択します。

最終的なポリシードキュメントは以下のようになります。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**IAM ユーザーを作成するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[Users]** (ユーザー) ページで、**[Add user]** (ユーザーを追加) を選択します。

1. [**User name**] に、`StockTradeStreamUser` と入力します。

1. [**アクセスの種類**] で、[**プログラムによるアクセス**] を選択し、[**次の手順: アクセス許可**] を選択します。

1. **[Attach existing policies directly (既存のポリシーを直接アタッチする)**] を選択します。

1. 上記の手順で作成したポリシーを名前 (`StockTradeStreamPolicy`) で検索します。ポリシー名の左にあるボックスを選択し、[**次の手順: 確認**] を選択します。

1. 詳細と概要を確認し、[**ユーザーの作成**] を選択します。

1. [**アクセスキー ID**] をコピーし、プライベート用に保存します。[**シークレットアクセスキー**] で [**表示**] を選択し、このキーもプライベートに保存します。

1. アクセスキーとシークレットキーを自分しかアクセスできない安全な場所にあるローカルファイルに貼り付けます。このアプリケーションでは、アクセス権限を厳しく制限した ` ~/.aws/credentials` という名前のファイルを作成します。ファイル形式は次のようになります。

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**IAM ポリシーをユーザーにアタッチするには**

1. IAM コンソールで、[[ポリシー](https://console.aws.amazon.com/iam/home?#policies)] を開いて [**ポリシーアクション**] を選択します。

1. [`StockTradeStreamPolicy`] および [**アタッチ**] を選択します。

1. [`StockTradeStreamUser`] および [**ポリシーのアタッチ**] を選択します。

## 次の手順
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[コードをダウンロードしてビルドする](tutorial-stock-data-kplkcl2-download.md)

# コードをダウンロードしてビルドする
<a name="tutorial-stock-data-kplkcl2-download"></a>

このトピックでは、データストリームへのサンプル株式取引の取り込み (*プロデューサー*) とこのデータの処理 (*コンシューマー*) のサンプル実装コードを示します。

**コードをダウンロードしてビルドするには**

1. [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub リポジトリからコンピュータにソースコードをダウンロードします。

1. 提供されたディレクトリ構造に従って、IDE でソースコードを使用してプロジェクトを作成します。

1. プロジェクトに次のライブラリを追加します。
   + Amazon Kinesis クライアントライブラリ (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Java 用の Google コアライブラリ)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. IDE によっては、プロジェクトが自動的にビルドされる場合があります。自動的にビルドされない場合は、IDE に適切なステップを使用してプロジェクトをビルドします。

上記のステップが正常に完了したら、次のセクション ([プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md)) に進みます。

## 次の手順
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md)

# プロデューサーを実装する
<a name="tutorial-stock-data-kplkcl2-producer"></a>

このチュートリアルでは、株式市場取引をモニタリングする実際のシナリオを使用しています。以下の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングできます。

[ソースコード](https://github.com/aws-samples/amazon-kinesis-learning )を参照し、以下の情報を確認します。

**StockTrade クラス**  
株式取引は、StockTrade クラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。

**ストリームレコード**  
ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する `StockTrade` インスタンスの 1 つを表しています。例:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator クラス**  
StockTradeGenerator には、呼び出されるたびにランダムに生成された新しい株式取引を返す、`getRandomTrade()` という名前のメソッドが含まれています。このクラスは、既に実装されています。

**StockTradesWriter クラス**  
プロデューサーの `main` メソッドである StockTradesWriter は、継続的にランダム取引を取得し、以下のタスクを実行してその取引を Kinesis Data Streams に送信します。  

1. ストリーム名とリージョン名を入力として読み取ります。

1. `KinesisAsyncClientBuilder` を使用してリージョン、認証情報、クライアント構成を設定します。

1. ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。

1. 連続ループで、`StockTradeGenerator.getRandomTrade()` メソッドに続き `sendStockTrade` メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。
`sendStockTrade` クラスの `StockTradesWriter` メソッドには次のコードがあります。  

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

次のコードの詳細を参照してください。
+ `PutRecord` API はバイト配列を想定するため、その取引を JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 取引を送信する前に、新しい `PutRecordRequest` インスタンス (この場合は request) を作成する必要があります。各 `request` には、ストリーム名、パーティションキー、データ BLOB が必要です。

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  この例では、株式ティッカーをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、[Amazon Kinesis Data Streams にデータを書き込む](building-producers.md)を参照してください。

  次に、`request` をクライアントに送信できます (put オペレーション)。

  ```
     kinesisClient.putRecord(request).get();
  ```
+ エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  `put` オペレーションの前後に try/catch ブロックを追加します。

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の put オペレーションが失敗することがあるためです。データが失われることがないように、再試行を使用するなど、`put` オペレーションの再試行ポリシーを慎重に検討することをお勧めします。
+ ステータスのログ記録は有益ですが、オプションです。

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能 `PutRecord` が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、`PutRecords` のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、[Amazon Kinesis Data Streams にデータを書き込む](building-producers.md)を参照してください。

**プロデューサーを実行するには**

1. [IAM ユーザーとポリシーを作成する](tutorial-stock-data-kplkcl2-iam.md) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradeWriter` クラスを実行します。

   ```
   StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

次のような出力が表示されます:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Kinesis Data Streams によって株式取引が取り込まれます。

## 次の手順
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[コンシューマーを実装する](tutorial-stock-data-kplkcl2-consumer.md)

# コンシューマーを実装する
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

このチュートリアルのコンシューマーアプリケーションは、データストリームの株式取引を継続的に処理します。その後、1 分ごとに売買されている最も人気のある株式を出力します。このアプリケーションは、Kinesis Client Library (KCL) 上に構築されており、コンシューマーアプリケーションに共通する面倒な作業の多くを行います。詳細については、[KCL 1.x および 2.x の情報](shared-throughput-kcl-consumers.md)を参照してください。

ソースコードを参照し、次の情報を確認してください。

**StockTradesProcessor クラス**  
事前に用意されているコンシューマーのメインクラスで、次のタスクを実行します。  
+ 引数として渡されたアプリケーション、データストリーム、リージョン名を読み取ります。
+ リージョン名で `KinesisAsyncClient` インスタンスを作成します。
+ `ShardRecordProcessor` のインスタンスとして機能し、`StockTradeRecordProcessor` インスタンスによって実装される、`StockTradeRecordProcessorFactory` インスタンスを作成します。
+ `KinesisAsyncClient`、`StreamName`、`ApplicationName`、および `StockTradeRecordProcessorFactory` インスタンスを使用して `ConfigsBuilder` インスタンスを作成します。これは、デフォルト値ですべての設定を作成するのに役立ちます。
+ `ConfigsBuilder` インスタンスを使用して KCL スケジューラー (以前は KCL バージョン 1.x では KCL ワーカー) を作成します。
+ このスケジューラーは、(このコンシューマーインスタンスに割り当てられた) 各シャードに新しいスレッドを作成します。これにより、継続的にデータストリームからレコードが読み取られます。次に、`StockTradeRecordProcessor` インスタンスを呼び出して、受信したレコードのバッチを処理します。

**StockTradeRecordProcessor クラス**  
`StockTradeRecordProcessor` インスタンスを実装したら、次は `initialize`、`processRecords`、`leaseLost`、`shardEnded`、`shutdownRequested` の 5 つの必須メソッドを実装します。  
KCL は `initialize` および `shutdownRequested` メソッドを使用して、レコードの受信を開始できるタイミングと、レコードの受信を停止するタイミングをそれぞれレコードプロセッサに通知し、アプリケーション固有の設定および終了タスクを実行できるようにします。`leaseLost`および `shardEnded` は、リースが失われたとき、または処理がシャードの終わりに達したときの動作ロジックを実装するために使用します。この例では、これらのイベントを示すメッセージをログに記録するだけです。  
これらのメソッドのコードを示しています。主な処理は `processRecords` メソッドで行われ、そこでは各レコードの `processRecord` が使用されます。後者のメソッドは、ほとんどの場合、空のスケルトンコードとして提供されます。次のステップでは、これを実装する方法について説明します。詳細については、次のステップを参照してください。  
また、`processRecord` のサポートメソッドである `reportStats` および `resetStats` の実装にも注目してください。これらのメソッドは、元のソースコードでは空になっています。  
`processRecords` メソッドは既に実装されており、次のステップを実行します。  
+ 渡されたレコードごとに `processRecord` を呼び出します。
+ 最後のレポートから 1 分間以上経過した場合は、`reportStats()` を呼び出して最新の統計を出力し、次の間隔に新しいレコードのみ含まれるように `resetStats()` を呼び出して統計を消去します。
+ 次のレポート時間を設定します。
+ 最後のチェックポイントから 1 分間以上経過した場合は、`checkpoint()` を呼び出します。
+ 次のチェックポイント時間を設定します。
このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェックポイントの詳細については、[Kinesis Client Library の使用](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)を参照してください。

**StockStats クラス**  
このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。このコードは、事前に用意されており、次のメソッドが含まれています。  
+ `addStockTrade(StockTrade)`: 指定された `StockTrade` を実行中の統計に取り込みます。
+ `toString()`: 特定の形式の文字列として統計を返します。
このクラスは、各株式の合計取引数と最大取引数を継続的にカウントすることで、最も人気のある株式を追跡します。これらの数は、株式取引を受け取る度に更新されます。

次のステップに示されているコードを `StockTradeRecordProcessor` クラスのメソッドに追加します。

**コンシューマーを実装するには**

1. `processRecord` メソッドを実装するには、サイズの正しい `StockTrade` オブジェクトを開始し、それにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにします。

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. `reportStats` メソッドを実装します。出力形式は必要に応じて変更できます。

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. 新しい `resetStats` インスタンスを作成する `stockStats` メソッドを実装します。

   ```
   stockStats = new StockStats();
   ```

1. `ShardRecordProcessor` インターフェイスに必要な以下のメソッドを実装します。

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**コンシューマーを実行するには**

1. [[プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl2-producer.md) で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに取り込みます。

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradesProcessor` クラスを実行します。

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## 次の手順
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[（オプション) コンシューマーを拡張する](tutorial-stock-data-kplkcl2-consumer-extension.md)

# （オプション) コンシューマーを拡張する
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

このオプションのセクションでは、さらに複雑なシナリオにも対応できるようにコンシューマーコードを拡張する方法について説明します。

1 分ごとに最大の売り注文を知るには、3 箇所の `StockStats` クラスを変更し、新しい優先順位を組み込みます。

**コンシューマーを拡張するには**

1. 新しいインスタンス変数を追加します。

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. 次のコードを `addStockTrade` に追加します。

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. `toString` メソッドを変更し、追加情報を出力します。

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

コンシューマーを今すぐ実行すると (プロデューサーも忘れずに実行してください)、次のような出力が表示されます。

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## 次の手順
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[リソースをクリーンアップする](tutorial-stock-data-kplkcl2-finish.md)

# リソースをクリーンアップする
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Kinesis Data Streams の使用には料金がかかるため、作業が終わったら、ストリームおよび対応する Amazon DynamoDB テーブルは必ず削除してください。レコードを送信したり取得したりしていなくても、ストリームがアクティブなだけでわずかな料金が発生します。その理由として、アクティブなストリームでは、受信レコードを継続的に "リッスン" し、レコードを取得するようにリクエストすることにリソースが使用されるためです。

**ストリームおよびテーブルを削除するには**

1. 実行しているプロデューサーおよびコンシューマーをすべてシャットダウンします。

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. このアプリケーション用に作成したストリーム (`StockTradeStream`) を選択します 。

1. [**ストリームの削除**] を選択します。

1. DynamoDB コンソール ([https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)) を開きます。

1. `StockTradesProcessor` テーブルを削除します。

## 概要
<a name="tutorial-stock-data-kplkcl2-summary"></a>

ほぼリアルタイムで大量のデータを処理するために、複雑なコードを記述したり、大規模なインフラストラクチャを開発したりする必要はありません。Kinesis Data Streams を使用すれば、基本のデータを処理するロジックを記述する (`processRecord(Record)` を記述するなど) 場合と同じように簡単にスケールして、大量のストリーミングデータに対応できます。Kinesis Data Streams が代わりに処理してくれるため、処理を拡張する方法を心配しなくて済みます。することと言えば、ストリームレコードを Kinesis Data Streams に送信し、受信した新しい各レコードを処理するロジックを記述するだけです。

このアプリケーションについて考えられる拡張機能は、次のとおりです。

**すべてのシャードで集計する**  
現在は、単一のワーカーが単一のシャードから受け取ったデータレコードの集約に基づく統計が取得されます (複数のワーカーが同時に単一のアプリケーションからシャードを処理することはできません)。拡張するときに複数のシャードがある場合、すべてのシャードで集計しようと考えるかもしれません。そのためには、パイプラインアーキテクチャを用意します。パイプラインアーキテクチャでは、各ワーカーの出力が単一のシャードを持つ別のストリームに供給され、第 1 段階の出力を集計するワーカーによってそのストリームが処理されます。第 1 段階のデータが制限されるため (シャードごとに 1 分間あたり 1 つのサンプル)、シャードごとに処理しやすくなります。

**処理の拡張**  
多数のシャードが含まれるようにストリームを拡張する場合 (多数のプロデューサーがデータを送信している場合)、処理を拡張するには、より多くのワーカーを追加します。複数のワーカーを Amazon EC2 インスタンスで実行し、Auto Scaling グループを使用することができます。

**Amazon S3/DynamoDB/Amazon Redshift/Storm へのコネクタを使用する**  
ストリームが継続的に処理されると、その出力を他の送信先に送信できます。 は、Kinesis Data Streams を他の AWS サービスやサードパーティーのツールと統合するための[コネクタ](https://github.com/awslabs/amazon-kinesis-connectors) AWS を提供します。

# チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する
<a name="tutorial-stock-data-kplkcl"></a>

このチュートリアルのシナリオでは、株式取引をデータストリームに取り込み、ストリーム上で計算を実行するシンプルな Amazon Kinesis Data Streams アプリケーションを記述する必要があります。レコードのストリームを Kinesis Data Streams に送信し、ほぼリアルタイムでレコードを消費および処理するアプリケーションを実装する方法を説明します。

**重要**  
ストリームを作成すると、Kinesis Data Streams は AWS 無料利用枠の対象ではないため、Kinesis Data Streams の使用に対して アカウントにわずかな料金が発生します。コンシューマーアプリケーションが起動すると、Amazon DynamoDB の使用に伴う料金がわずかに発生します。コンシューマーアプリケーションでは、処理状態を追跡する際に DynamoDB を使用します。このアプリケーションを終了したら、 AWS リソースを削除して料金が発生しないようにしてください。詳細については、[リソースをクリーンアップする](tutorial-stock-data-kplkcl-finish.md)を参照してください。

このコードでは、実際の株式市場データにアクセスする代わりに、株式取引のストリームをシミュレートします。シミュレーションには、2015 年 2 月時点における時価総額上位 25 社の株式に関する実際の市場データを基にしたランダム株式取引ジェネレーターが使用されています。リアルタイムの株式取引のストリームにアクセスできたとしたら、そのときに必要としている有益な統計を入手したいと考えるかもしれません。たとえば、スライディングウィンドウ分析を実行して、過去 5 分間に購入された最も人気のある株式を調べたいと思われるかもしれません。または、大規模な売り注文 (膨大な株式が含まれる売り注文) が発生したときに通知を受けたいと思われるかもしれません。このシリーズのコードを拡張して、このような機能を使用することもできます。

このチュートリアルにある手順をデスクトップやノートパソコンで実行し、同じマシンまたは定義された要件を満たす任意のプラットフォーム (例: Amazon Elastic Compute Cloud (Amazon EC2)) で、プロデューサーおよびコンシューマーのコードのいずれも実行できます。

この例では、米国西部 (オレゴン) リージョンが使用されていますが、[Kinesis Data Streams がサポートされるAWS リージョン](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)であれば、いずれのリージョンでも動作します。

**Topics**
+ [

# 前提条件を満たす
](tutorial-stock-data-kplkcl-begin.md)
+ [

# データストリームを作成する
](tutorial-stock-data-kplkcl-create-stream.md)
+ [

# IAM ユーザーとポリシーを作成する
](tutorial-stock-data-kplkcl-iam.md)
+ [

# 実装コードをダウンロードおよびビルドする
](tutorial-stock-data-kplkcl-download.md)
+ [

# プロデューサーを実装する
](tutorial-stock-data-kplkcl-producer.md)
+ [

# コンシューマーを実装する
](tutorial-stock-data-kplkcl-consumer.md)
+ [

# （オプション) コンシューマーを拡張する
](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [

# リソースをクリーンアップする
](tutorial-stock-data-kplkcl-finish.md)

# 前提条件を満たす
<a name="tutorial-stock-data-kplkcl-begin"></a>

[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) を作成するための要件を以下に示します。

## Amazon Web Services アカウントを作成して使用する
<a name="tutorial-stock-data-kplkcl-begin-aws"></a>

開始する前に、[Amazon Kinesis Data Streams の用語と概念](key-concepts.md)で説明されている概念、特にストリーム、シャード、プロデューサー、コンシューマーについて理解しておきます。また、[チュートリアル: AWS CLI for Kinesis Data Streams のインストールと設定](kinesis-tutorial-cli-installation.md)を完了していると役立ちます。

にアクセスするには、 AWS アカウントとウェブブラウザが必要です AWS マネジメントコンソール。

コンソールアクセスの場合は、IAM ユーザー名とパスワードを使用して IAM サインインページから [AWS マネジメントコンソール](https://console.aws.amazon.com/console/home) にサインインします。プログラムによるアクセスや長期的な認証情報の代替方法など、 AWS セキュリティ認証情報の詳細については、*IAM ユーザーガイド*の[AWS 「セキュリティ認証情報](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)」を参照してください。へのサインインの詳細については AWS アカウント、 *AWS サインイン ユーザーガイド*の[「 へのサインイン方法 AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)」を参照してください。

IAM とセキュリティキーの設定手順の詳細については、[IAM ユーザーを作成する](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user)を参照してください。

## システムソフトウェア要件を満たす
<a name="tutorial-stock-data-kplkcl-begin-sys"></a>

アプリケーションを実行するシステムには、Java 7 以上がインストールされている必要があります。最新の Java Development Kit (JDK) をダウンロードおよびインストールするには、[Oracle 社の Java SE インストールサイト](http://www.oracle.com/technetwork/java/javase/downloads/index.html)を参照してください。

[Eclipse](https://www.eclipse.org/downloads/) などの Java IDE をお持ちの場合は、ソースコードを開いて編集、ビルド、および実行できます。

最新バージョンの [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) が必要です。Eclipse を IDE として使用している場合は、[AWS Toolkit for Eclipse](https://aws.amazon.com/eclipse/) を代わりにインストールできます。

コンシューマーアプリケーションには、バージョン 1.2.1 以上の Kinesis クライアントライブラリ (KCL) が必要です。これは、[Kinesis クライアントライブラリ (Java)](https://github.com/awslabs/amazon-kinesis-client) の GitHub から入手することができます。

## 次のステップ
<a name="tutorial-stock-data-kplkcl-begin-next"></a>

[データストリームを作成する](tutorial-stock-data-kplkcl-create-stream.md)

# データストリームを作成する
<a name="tutorial-stock-data-kplkcl-create-stream"></a>

[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) の最初のステップで、後のステップで使用するストリームを作成します。

**ストリームを作成するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

1. ナビゲーションペインで、[**データストリーム**] を選択します。

1. ナビゲーションバーで、リージョンセレクターを展開し、リージョンを選択します。

1. [**Kinesis ストリームの作成**] を選択します。

1. ストリームの名前 (例: **StockTradeStream**) を入力します。

1. シャードカウントは **1** と入力しますが、[**必要なシャードカウントの予想**] は折りたたんだままにします。

1. [**Kinesis ストリームの作成**] を選択します。

[**Kinesis streams**] リストのページで、作成中のストリームのステータスは `CREATING` になります。ストリームを使用する準備ができると、ステータスは `ACTIVE` に変わります。ストリームの名前を選択します。表示されたページの [**詳細**] タブには、ストリーム設定の概要が示されます。[**モニタリング**] セクションには、ストリームのモニタリング情報が表示されます。

## シャードに関する追加情報
<a name="tutorial-stock-data-kplkcl-create-stream-info"></a>

このチュートリアルを除き、初めて Kinesis Data Streams を使用する場合は、より慎重にストリーム作成プロセスを計画する必要がある場合があります。シャードをプロビジョニングするときには、予想される最大需要を考慮する必要があります。このシナリオを例として使用すると、米国の株式市場の取引トラフィックは、昼間 (東部標準時) にピークを迎えます。その時刻をサンプルとして需要の予測を行う必要があります。その後、予想される最大需要に合わせてプロビジョニングするか、需要の変動に応じてストリームを拡大または縮小することができます。

*シャード*は、スループットキャパシティの単位です。[**Kinesis ストリームの作成**] ページで、[**必要なシャードカウントの予想**] を展開します。次のガイドラインに従って、平均レコードサイズ、1 秒間に書き込まれる最大レコード数、コンシューマーアプリケーションの数を入力します。

**平均レコードサイズ**  
計算される平均レコードサイズの予測。この値がわからない場合は、予測される最大レコードサイズを使用します。

**書き込まれる最大レコード数**  
データを提供するエンティティの数と各エンティティで 1 秒間に生成されるおよそのレコード数を考慮に入れます。たとえば、20 台の取引サーバーから株式取引データを取得し、各サーバーで 1 秒間に 250 個の取引が生成される場合、1 秒あたりの合計取引数 (レコード数) は 5,000 になります。

**コンシューマーアプリケーションの数**  
独立してストリームを読み取り、ストリームを固有の方法で処理し、固有の出力を生成するアプリケーションの数。各アプリケーションでは、複数のインスタンスを異なるマシン (つまり、クラスター) で実行することができます。このため、大規模なストリームでも遅延することなく処理できます。

表示された予測シャードカウントが現在のシャード制限を超えた場合は、その数のシャードカウントを含むストリームを作成する前に、制限を引き上げるリクエストの送信が必要な場合があります。シャード制限の引き上げをリクエストするには、[Kinesis Data Streams 制限フォーム](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis)を使用します。ストリームおよびシャードの詳細については、[Kinesis Data Streams を作成して管理する](working-with-streams.md)を参照してください。

## 次の手順
<a name="tutorial-stock-data-kplkcl-create-stream-next"></a>

[IAM ユーザーとポリシーを作成する](tutorial-stock-data-kplkcl-iam.md)

# IAM ユーザーとポリシーを作成する
<a name="tutorial-stock-data-kplkcl-iam"></a>

のセキュリティのベストプラクティス AWS により、きめ細かなアクセス許可を使用してさまざまなリソースへのアクセスを制御できます。 AWS Identity and Access Management (IAM) を使用すると、 でユーザーとユーザーのアクセス許可を管理できます AWS。[IAM ポリシー](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html)は、許可されるアクションとそのアクションが適用されるリソースを明示的にリストアップします。

一般的に、Kinesis Data Streams プロデューサーおよびコンシューマーには、次の最小許可が必要になります。


**プロデューサー**  

| アクション | [リソース]  | 目的 | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis Data Streams | レコードを書き込む前に、プロデューサーは、ストリームが存在し、アクティブであること、シャードがストリームに含まれていること、およびストリームにコンシューマーがあることを確認します。 | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis Data Streams | Kinesis Data Stream シャードにサブスクライブし、コンシューマーを登録します。 | 
| PutRecord, PutRecords | Kinesis Data Streams | Amazon Kinesis ストリームにレコードを書き込みます｡ | 


**コンシューマー**  

| **アクション** | **[リソース]**  | **目的** | 
| --- | --- | --- | 
| DescribeStream | Kinesis Data Streams | レコードを読み取る前に、コンシューマーは、ストリームが存在し、アクティブであることを確認し、ストリームにシャードが含まれることを確認します。 | 
| GetRecords, GetShardIterator  | Kinesis Data Streams | Kinesis Data Streams シャードからレコードを読み込みます。 | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB テーブル | Kinesis クライアントライブラリ (KCL) を使用してコンシューマーが開発されている場合は、アプリケーションの処理状態を追跡するときに DynamoDB テーブルへの許可が必要です。テーブルは、最初に開始したコンシューマーによって作成されます。 | 
| DeleteItem | Amazon DynamoDB テーブル | コンシューマーが Kinesis Data Streams シャードで分割と結合のオペレーションを実行する場合。 | 
| PutMetricData | Amazon CloudWatch Logs | また、KCL は、アプリケーションをモニタリングするのに便利なメトリクスも CloudWatch にアップロードします。 | 

このアプリケーションでは、前述のすべての許可を付与する単一の IAM ポリシーを作成します。実際には、プロデューサーとコンシューマーに 1 つずつ、2 つのポリシーを作成することになるかもしれません。

**IAM ポリシーを作成するには**

1. 新しいストリームの Amazon リソースネーム (ARN) を見つけます。この ARN は、 [**ストリーム ARN**] として [**詳細**]タブの上部に表示されます。ARN 形式 は次のとおりです。

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*リージョン*  
リージョンコード (`us-west-2` など)。詳細については、[リージョンとアベイラビリティーゾーンの概念](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones)を参照してください。  
*アカウント*  
 AWS アカウント[設定](https://console.aws.amazon.com/billing/home?#/account)に示されているアカウント ID。  
*名前*  
[データストリームを作成する](tutorial-stock-data-kplkcl-create-stream.md) からのストリームの名前 (`StockTradeStream`)。

1. コンシューマーによって使用される (最初のコンシューマーインスタンスによって作成された) DynamoDB テーブルの ARN を決定します。次のような形式になります。

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   リージョンとアカウントは前のステップと同じ場所のものですが、この場合の*名前*はコンシューマーアプリケーションによって作成および使用されるテーブルの名前となります。コンシューマーによって使用される KCL では、アプリケーション名がテーブル名として使用されます。後で使用されるアプリケーション名である `StockTradesProcessor` を使用します。

1. IAM コンソールの**ポリシー** ([https://console.aws.amazon.com/iam/home\$1policies](https://console.aws.amazon.com/iam/home#policies)) で、[**ポリシーの作成**] を選択します。IAM ポリシーを初めて扱う場合には、[**今すぐ始める**]、[**ポリシーの作成**] を選択します。

1. [**ポリシージェネレーター**] の横の [**選択**] を選択します。

1.  AWS サービスとして **Amazon Kinesis** を選択します。

1. 許可されるアクションとして、`DescribeStream`、`GetShardIterator`、`GetRecords`、`PutRecord`、および `PutRecords` を選択します。

1. ステップ 1 で作成した ARN を入力します。

1. 以下の各項目について、[**ステートメントを追加**] を使用します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   ARN を指定するときに使用されるアスタリスク (`*`) は必要ありません。`PutMetricData` アクションが呼び出される特定のリソースが CloudWatch に存在しない場合などがこれに該当します。

1. ［**Next Step**］ (次のステップ) をクリックします。

1. [**ポリシー名**] を `StockTradeStreamPolicy` に変更し、コードを確認して、[**ポリシーの作成**] を選択します。

取得されたポリシードキュメントには、次のような結果が表示されます

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**IAM ユーザーを作成するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[Users]** (ユーザー) ページで、**[Add user]** (ユーザーを追加) を選択します。

1. [**User name**] に、`StockTradeStreamUser` と入力します。

1. [**アクセスの種類**] で、[**プログラムによるアクセス**] を選択し、[**次の手順: アクセス許可**] を選択します。

1. **[Attach existing policies directly (既存のポリシーを直接アタッチする)**] を選択します。

1. 作成したポリシーの名前で検索します。ポリシー名の左にあるボックスを選択し、[**次の手順: 確認**] を選択します。

1. 詳細と概要を確認し、[**ユーザーの作成**] を選択します。

1. [**アクセスキー ID**] をコピーし、プライベート用に保存します。[**シークレットアクセスキー**] で [**表示**] を選択し、このキーもプライベートに保存します。

1. アクセスキーとシークレットキーを自分しかアクセスできない安全な場所にあるローカルファイルに貼り付けます。このアプリケーションでは、アクセス権限を厳しく制限した ` ~/.aws/credentials` という名前のファイルを作成します。ファイル形式は次のようになります。

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**IAM ポリシーをユーザーにアタッチするには**

1. IAM コンソールで、[[ポリシー](https://console.aws.amazon.com/iam/home?#policies)] を開いて [**ポリシーアクション**] を選択します。

1. [`StockTradeStreamPolicy`] および [**アタッチ**] を選択します。

1. [`StockTradeStreamUser`] および [**ポリシーのアタッチ**] を選択します。

## 次のステップ
<a name="tutorial-stock-data-kplkcl-iam-next"></a>

[実装コードをダウンロードおよびビルドする](tutorial-stock-data-kplkcl-download.md)

# 実装コードをダウンロードおよびビルドする
<a name="tutorial-stock-data-kplkcl-download"></a>

スケルトンコードは [チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md) 用に提供されています。このコードには、株式取引ストリームの取り込み (*プロデューサー*) およびデータの処理 (*コンシューマー*) のいずれにも使用できるスタブ実装が含まれています。次の手順は、実装を完了する方法を示しています。

**実装コードをダウンロードおよびビルドするには**

1. [ソースコード](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1)をコンピュータにダウンロードします。

1. 提供されたディレクトリ構造に従って、お好みの IDE でソースコードを使用してプロジェクトを作成します。

1. プロジェクトに次のライブラリを追加します。
   + Amazon Kinesis クライアントライブラリ (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Java 用の Google コアライブラリ)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. IDE によっては、プロジェクトが自動的にビルドされる場合があります。自動的にビルドされない場合は、IDE に適切なステップを使用してプロジェクトをビルドします。

上記のステップが正常に完了したら、次のセクション ([プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)) に進みます。ビルドのいずれかの段階でエラーが発生した場合は、先に進む前に、原因を調査の上、解決してください。

## 次の手順
<a name="tutorial-stock-data-kplkcl-download-next"></a>

[[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)

# プロデューサーを実装する
<a name="tutorial-stock-data-kplkcl-producer"></a>



[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) のアプリケーションでは、株式市場取引をモニタリングする実際のシナリオが使用されます。次の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングすることができます。

ソースコードを参照し、次の情報を確認してください。

**StockTrade クラス**  
株式取引は、`StockTrade` クラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。

**ストリームレコード**  
ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する `StockTrade` インスタンスの 1 つを表しています。例:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator クラス**  
`StockTradeGenerator` には、呼び出されるたびにランダムに生成された新しい株式取引を返す、`getRandomTrade()` と呼ばれるメソッドが含まれています。このクラスは、既に実装されています。

**StockTradesWriter クラス**  
プロデューサーの `main` メソッドである `StockTradesWriter` は、継続的にランダム取引を取得し、以下のタスクを実行してそれらを Kinesis Data Streams に送信します。  

1. ストリーム名とリージョン名を入力として読み取ります。

1. `AmazonKinesisClientBuilder` を作成します。

1. クライアントビルダーを使用してリージョン、認証情報、およびクライアント構成を設定します。

1. クライアントビルダーを使用して `AmazonKinesis` クライアントを構成します。

1. ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。

1. 連続ループで、`StockTradeGenerator.getRandomTrade()` メソッドに続き `sendStockTrade` メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。
`sendStockTrade` クラスの `StockTradesWriter` メソッドには次のコードがあります。  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

次のコードの詳細を参照してください。
+ `PutRecord` API はバイト配列を想定するため、`trade` を JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 取引を送信する前に、新しい `PutRecordRequest` インスタンス (この場合、`putRecord` と呼ばれる) を作成する必要があります。

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  各 `PutRecord` の呼び出しには、ストリーム名、パーティションキー、およびデータ BLOB が必要です。次のコードによって、`putRecord` メソッドを使用して、これらのフィールドを `setXxxx()` オブジェクトに追加します。

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  この例では、株式チケットをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、[ストリームにデータを追加する](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)を参照してください。

  次に、`putRecord` をクライアントに送信 (`put` オペレーション) することができます。

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  `put` オペレーションの前後に try/catch ブロックを追加します。

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の `put` オペレーションが失敗することがあるためです。データが失われることがないように、単純な再試行を使用するなど、`put` オペレーションの再試行ポリシーを慎重に検討することをお勧めします。
+ ステータスのログ記録は有益ですが、オプションです。

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能 `PutRecord` が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、`PutRecords` のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、[ストリームにデータを追加する](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)を参照してください。

**プロデューサーを実行するには**

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradeWriter` クラスを実行します。

   ```
   StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

次のような出力が表示されます:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Kinesis Data Streams によって株式取引ストリームが取り込まれます。

## 次の手順
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[コンシューマーを実装する](tutorial-stock-data-kplkcl-consumer.md)

# コンシューマーを実装する
<a name="tutorial-stock-data-kplkcl-consumer"></a>

[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) のコンシューマーアプリケーションでは、[[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)で作成した株式取引ストリームを継続的に処理します。その後、1 分ごとに売買されている最も人気のある株式を出力します。このアプリケーションは、Kinesis Client Library (KCL) 上に構築されており、コンシューマーアプリケーションに共通する面倒な作業の多くを行います。詳細については、[KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)を参照してください。

ソースコードを参照し、次の情報を確認してください。

**StockTradesProcessor クラス**  
事前に用意されているコンシューマーのメインクラスで、次のタスクを実行します。  
+ 引数として渡されたアプリケーション、ストリーム、およびリージョン名を読み取ります。
+ `~/.aws/credentials` から認証情報を読み取ります。
+ `RecordProcessor` のインスタンスとして機能し、`StockTradeRecordProcessor` インスタンスによって実装される、`RecordProcessorFactory` インスタンスを作成します。
+ `RecordProcessorFactory` インスタンスおよび標準設定 (例: ストリーム名、認証情報、アプリケーション名) が指定された KCL ワーカーを作成します。
+ このワーカーは、(このコンシューマーインスタンスに割り当てられた) 各シャードに新しいスレッドを作成します。これにより、継続的に Kinesis Data Streams からレコードが読み取られます。次に、`RecordProcessor` インスタンスを呼び出して、受信したレコードのバッチを処理します。

**StockTradeRecordProcessor クラス**  
`RecordProcessor` インスタンスを実装したら、次に `initialize`、`processRecords`、`shutdown` の 3 つの必須メソッドを実装します。  
Kinesis Client Library によって使用される `initialize` および `shutdown` は、名前が示すとおり、レコードの受信がいつ開始し、いつ終了するかをレコードプロセッサに知らせます。これにより、レコードプロセッサは、アプリケーションに固有の設定および終了タスクを行うことができます。これらのコードは事前に用意されています。主な処理は `processRecords` メソッドで行われ、そこでは各レコードの `processRecord` が使用されます。後者のメソッドは、ほとんどの場合、空のスケルトンコードとして提供されます。次のステップでは、これを実装する方法について説明します。詳細は、次のステップを参照してください。  
また、`processRecord` のサポートメソッドである `reportStats` および `resetStats` の実装にも注目してください。これらのメソッドは、元のソースコードでは空になっています。  
`processRecords` メソッドは既に実装されており、次のステップを実行します。  
+  渡された各レコードについて、レコード上で `processRecord` を呼び出します。
+ 最後のレポートから 1 分間以上経過した場合は、`reportStats()` を呼び出して最新の統計を出力し、次の間隔に新しいレコードのみ含まれるように `resetStats()` を呼び出して統計を消去します。
+ 次のレポート時間を設定します。
+ 最後のチェックポイントから 1 分間以上経過した場合は、`checkpoint()` を呼び出します。
+ 次のチェックポイント時間を設定します。
このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェックポイントの詳細については、[コンシューマーに関する追加情報](#tutorial-stock-data-kplkcl-consumer-supplement)を参照してください。

**StockStats クラス**  
このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。このコードは、事前に用意されており、次のメソッドが含まれています。  
+ `addStockTrade(StockTrade)`: 指定された `StockTrade` を実行中の統計に取り込みます。
+ `toString()`: 特定の形式の文字列として統計を返します。
このクラスは、各株式の合計取引数と最大取引数を継続的にカウントすることで、最も人気のある株式を追跡します。これらの数は、株式取引を受け取る度に更新されます。

次のステップに示されているコードを `StockTradeRecordProcessor` クラスのメソッドに追加します。

**コンシューマーを実装するには**

1. `processRecord` メソッドを実装するには、サイズの正しい `StockTrade` オブジェクトを開始し、それにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにします。

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. 簡単な `reportStats` メソッドを実装します。出力形式は好みに応じて自由に変更することができます。

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. 最後に、新しい `stockStats` インスタンスを作成する `resetStats` メソッドを実装します。

   ```
   stockStats = new StockStats();
   ```

**コンシューマーを実行するには**

1. [[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md) で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに取り込みます。

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradesProcessor` クラスを実行します。

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## コンシューマーに関する追加情報
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

[KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)などで説明されている Kinesis Client Library のメリットに詳しい方であれば、ここで使用することに疑問を感じるかもしれません。1 つのシャードストリームとそれを処理する 1 つのコンシューマーインスタンスしか使用しない場合でも、KCL を使用して簡単にコンシューマーを実装することができます。プロデューサーセクションとコンシューマーセクションのコードの実装手順を比較すると、コンシューマーの実装の方が比較的に簡単であることがわかります。これは、KCL で提供されているサービスが大きく関係しています。

このアプリケーションでは、個別のレコードを処理できるレコードプロセッサクラスの実装に焦点を合わせてきました。新しいレコードが使用可能になると、KCL がレコードを取得してレコードプロセッサを呼び出すため、Kinesis Data Streams からレコードを取得する方法を心配しなくて済みます。また、シャードカウントやコンシューマーインスタンス数についても心配しなくて済みます。ストリームがスケールアップされても、複数のシャードやコンシューマーインスタンスを処理するためにアプリケーションを書き直す必要はありません。

チェックポイントという用語は、ストリーム内のポイントを、これまで消費および処理されたデータレコードまで記録することを意味します。**アプリケーションがクラッシュすると、ストリームはストリームの先頭からではなく、その時点から読み取られます。チェックポイントやそのさまざまな設計パターン、およびベストプラクティスは、この章の範囲外です。ただし、本番環境ではこのような問題に直面することがあります。

[[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)で学習したように、Kinesis Data Streams API の `put` オペレーションは、*パーティションキー*を入力として受け取ります。Kinesis Data Streams は、レコードを複数のシャードに分割するメカニズムとしてパーティションキーを使用します (複数のシャードがストリームに含まれる場合)。同じパーティションキーは、常に同じシャードにルーティングされます。このため、同じパーティションキーを持つレコードはそのコンシューマーにのみ送信され、他のコンシューマーに送信されることはないと仮定して、特定のシャードを処理するコンシューマーを設計できます。したがって、コンシューマーのワーカーは、必要なデータが欠落しているかもしれないと心配することなく、同じパーティションキーを持つすべてのレコードを集計できます。

このアプリケーションでは、コンシューマーによるレコードの処理の負荷は高くないため、1 つのシャードを使用して、KCL スレッドと同じスレッドで処理することができます。ただし、実際には、まずシャードの数のスケールアップを検討します。レコードの処理が大変になることが予想される場合は、異なるスレッドに処理を切り替えたり、スレッドプールを使用したりする必要があるかもしれません。このように、その他のスレッドがレコードを並列処理していても、KCL は新しいレコードを迅速に取得できます。一般的に、マルチスレッド設計は簡単ではなく高度な技術が必要になるため、シャードの数を増やすことが最も効果的な拡張方法です。

## 次の手順
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[（オプション) コンシューマーを拡張する](tutorial-stock-data-kplkcl-consumer-extension.md)

# （オプション) コンシューマーを拡張する
<a name="tutorial-stock-data-kplkcl-consumer-extension"></a>

[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) のアプリケーションは、すでに目的を十分に果たしているかもしれません。このオプションのセクションでは、さらに複雑なシナリオにも対応できるようにコンシューマーコードを拡張する方法について説明します。

1 分ごとに最大の売り注文を知るには、3 箇所の `StockStats` クラスを変更し、新しい優先順位を組み込みます。

**コンシューマーを拡張するには**

1. 新しいインスタンス変数を追加します。

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. 次のコードを `addStockTrade` に追加します。

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. `toString` メソッドを変更し、追加情報を出力します。

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

コンシューマーを今すぐ実行すると (プロデューサーも忘れずに実行してください)、次のような出力が表示されます。

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## 次の手順
<a name="tutorial-stock-data-kplkcl-consumer-extension-next"></a>

[リソースをクリーンアップする](tutorial-stock-data-kplkcl-finish.md)

# リソースをクリーンアップする
<a name="tutorial-stock-data-kplkcl-finish"></a>

Kinesis Data Streams の使用には料金がかかるため、作業が終わったら、ストリームおよび対応する Amazon DynamoDB テーブルは必ず削除してください。レコードを送信したり取得したりしていなくても、ストリームがアクティブなだけでわずかな料金が発生します。その理由として、アクティブなストリームでは、受信レコードを継続的に "リッスン" し、レコードを取得するようにリクエストすることにリソースが使用されるためです。

**ストリームおよびテーブルを削除するには**

1. 実行しているプロデューサーおよびコンシューマーをすべてシャットダウンします。

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. このアプリケーション用に作成したストリーム (`StockTradeStream`) を選択します 。

1. [**ストリームの削除**] を選択します。

1. DynamoDB コンソール ([https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)) を開きます。

1. `StockTradesProcessor` テーブルを削除します。

## 概要
<a name="tutorial-stock-data-kplkcl-summary"></a>

ほぼリアルタイムで大量のデータを処理するために、複雑なコードを記述したり、大規模なインフラストラクチャを開発したりする必要はありません。Kinesis Data Streams を使用すれば、基本のデータを処理するロジックを記述する (`processRecord(Record)` を記述するなど) 場合と同じように簡単にスケールして、大量のストリーミングデータに対応できます。Kinesis Data Streams が代わりに処理してくれるため、処理を拡張する方法を心配しなくて済みます。することと言えば、ストリームレコードを Kinesis Data Streams に送信し、受信した新しい各レコードを処理するロジックを記述するだけです。

このアプリケーションについて考えられる拡張機能は、次のとおりです。

**すべてのシャードで集計する**  
現在は、単一のワーカーが単一のシャードから受け取ったデータレコードの集約に基づく統計が取得されます (複数のワーカーが同時に単一のアプリケーションからシャードを処理することはできません)。拡張するときに複数のシャードがある場合、すべてのシャードで集計しようと考えるかもしれません。そのためには、パイプラインアーキテクチャを用意します。パイプラインアーキテクチャでは、各ワーカーの出力が単一のシャードを持つ別のストリームに供給され、第 1 段階の出力を集計するワーカーによってそのストリームが処理されます。第 1 段階のデータが制限されるため (シャードごとに 1 分間あたり 1 つのサンプル)、シャードごとに処理しやすくなります。

**処理の拡張**  
多数のシャードが含まれるようにストリームを拡張する場合 (多数のプロデューサーがデータを送信している場合)、処理を拡張するには、より多くのワーカーを追加します。複数のワーカーを Amazon EC2 インスタンスで実行し、Auto Scaling グループを使用することができます。

**Amazon S3/DynamoDB/Amazon Redshift/Storm へのコネクタを使用する**  
ストリームが継続的に処理されると、その出力を他の送信先に送信できます。 は、Kinesis Data Streams を他の AWS サービスやサードパーティーのツールと統合するための[コネクタ](https://github.com/awslabs/amazon-kinesis-connectors) AWS を提供します。

## 次の手順
<a name="tutorial-stock-data-kplkcl-next-steps"></a>
+ Kinesis Data Streams API オペレーションの使用の詳細については、[で Amazon Kinesis Data Streams API を使用してプロデューサーを開発する AWS SDK for Java](developing-producers-with-sdk.md)、[を使用して共有スループットコンシューマーを開発する AWS SDK for Java](developing-consumers-with-sdk.md)、および [Kinesis Data Streams を作成して管理する](working-with-streams.md) を参照してください。
+ Kinesis Client Library の詳細については、「[KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)」を参照してください。
+ アプリケーションを最適化する方法については、[Amazon Kinesis Data Streams コンシューマーを最適化するKinesis Data Streams コンシューマーを最適化する](advanced-consumers.md)を参照してください。

# チュートリアル: Amazon Managed Service for Apache Flink を使用してリアルタイムの株式データを分析する
<a name="tutorial-stock-data"></a>

このチュートリアルのシナリオには、株式取引のデータストリームへの取り込みと、ストリームで計算を実行するシンプルな [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html) アプリケーションの記述が含まれます。レコードのストリームを Kinesis Data Streams に送信し、ほぼリアルタイムでレコードを消費および処理するアプリケーションを実装する方法を説明します。

Amazon Managed Service for Apache Flink を使用すると、Java、または Scala のいずれかを使用してストリーミングデータを処理および分析できます。このサービスを使用すると、ストリーミングソースに対して Java または Scala コードを作成して実行し、時系列分析の実行、ダッシュボードへのリアルタイムフィード、メトリクスのリアルタイム作成を行うことができます。

Flink アプリケーションは、[Apache Flink](https://flink.apache.org/) に基づくオープンソースライブラリを使用して、Managed Service for Apache Flink で構築できます。Apache Flink は、データストリームを処理するための一般的なフレームワークおよびエンジンです。

**重要**  
2 つのデータストリームとアプリケーションを作成すると、 AWS 無料利用枠の対象とならないため、アカウントでは Kinesis Data Streams と Managed Service for Apache Flink の使用に対してわずかな料金が発生します。このアプリケーションが終了したら、 AWS リソースを削除して料金の発生を停止します。

このコードでは、実際の株式市場データにアクセスする代わりに、株式取引のストリームをシミュレートします。そのために、ランダム株式取引ジェネレーターが使用されます。リアルタイムの株式取引のストリームにアクセスできたとしたら、そのときに必要としている有益な統計を入手したいと考えるかもしれません。たとえば、スライディングウィンドウ分析を実行して、過去 5 分間に購入された最も人気のある株式を調べたいと思われるかもしれません。または、大規模な売り注文 (膨大な株式が含まれる売り注文) が発生したときに通知を受けたいと思われるかもしれません。このシリーズのコードを拡張して、このような機能を使用することもできます。

この例では米国西部 (オレゴン) リージョンが使用されていますが、[Managed Service for Apache Flink がサポートされるAWS リージョン](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region)であれば、どのリージョンでも動作します。

**Topics**
+ [

## 演習を完了するための前提条件
](#setting-up-prerequisites)
+ [

# AWS アカウントをセットアップし、管理者ユーザーを作成する
](setting-up.md)
+ [

# AWS Command Line Interface (AWS CLI) のセットアップ
](setup-awscli.md)
+ [

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
](get-started-exercise.md)

## 演習を完了するための前提条件
<a name="setting-up-prerequisites"></a>

このガイドの手順を完了するには、以下が必要です。
+ [Java 開発キット](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) (JDK) バージョン 8。`JAVA_HOME` 環境変数を、JDK のインストール場所を指すように設定します。
+ 開発環境 ([Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) や [IntelliJ Idea など](https://www.jetbrains.com/idea/)) を使用してアプリケーションを開発し、コンパイルすることをお勧めします。
+ [Git クライアント](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。Git クライアントをまだインストールしていない場合は、インストールします。
+ [Apache Maven Compiler Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/)。Maven が作業パスに含まれている必要があります。Apache Maven のインストールをテストするには、次のように入力します。

  ```
  $ mvn -version
  ```

開始するには、[AWS アカウントをセットアップし、管理者ユーザーを作成する](setting-up.md)に進みます。

# AWS アカウントをセットアップし、管理者ユーザーを作成する
<a name="setting-up"></a>

Amazon Managed Service for Apache Flink を初めて使用する前に、以下のタスクを完了してください：

1. [にサインアップする AWS](#setting-up-signup)

1. [IAM ユーザーの作成](#setting-up-iam)

## にサインアップする AWS
<a name="setting-up-signup"></a>

Amazon Web Services (AWS) にサインアップすると、Amazon Managed Service for Apache Flink を含む AWSのすべてのサービスに AWS アカウントが自動的にサインアップされます。請求されるのは、使用したサービスの料金のみです。

Managed Service for Apache Flink では、使用したリソースの料金のみを支払います。 AWS の新規のお客様の場合は、Managed Service for Apache Flink の使用を無料で開始できます。詳細については、[AWS 無料利用枠](https://aws.amazon.com/free/)を参照してください。

 AWS アカウントが既にある場合は、次のタスクに進んでください。 AWS アカウントをお持ちでない場合は、次のステップに従って作成します。

**AWS アカウントを作成するには**

1. [https://portal.aws.amazon.com/billing/signup](https://portal.aws.amazon.com/billing/signup) を開きます。

1. オンラインの手順に従います。

   サインアップ手順の一環として、電話またはテキストメッセージを受け取り、電話キーパッドで検証コードを入力します。

   にサインアップすると AWS アカウント、 *AWS アカウントのルートユーザー* が作成されます。ルートユーザーには、アカウントのすべての AWS のサービス とリソースへのアクセス権があります。セキュリティベストプラクティスとして、ユーザーに管理アクセス権を割り当て、[ルートユーザーアクセスが必要なタスク](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks)の実行にはルートユーザーのみを使用するようにしてください。

次のタスクで必要になるため、 AWS アカウント ID を書き留めます。

## IAM ユーザーの作成
<a name="setting-up-iam"></a>

Amazon Managed Service for Apache Flink AWSなどの のサービスにアクセスするには、認証情報を指定する必要があります。これにより、サービスのリソースにアクセスする権限の有無が判定されます。 AWS マネジメントコンソール では、パスワードを入力する必要があります。

 AWS Command Line Interface (AWS CLI) または API にアクセスするための AWS アカウントのアクセスキーを作成できます。ただし、 AWS アカウントの認証情報 AWS を使用して にアクセスすることはお勧めしません。代わりに、 AWS Identity and Access Management (IAM) を使用することをお勧めします。IAM ユーザーを作成し、管理者アクセス許可を持つ IAM グループにユーザーを追加したら、作成した IAM ユーザーに管理者アクセス許可を付与します。その後、特別な URL とその IAM ユーザーの認証情報を使用して AWS にアクセスできます。

にサインアップしたが AWS、自分で IAM ユーザーを作成していない場合は、IAM コンソールを使用して作成できます。

このガイドの使用開始実習では、管理者権限を持つユーザー (`adminuser`) が存在すること想定しています。手順に従ってアカウントに `adminuser` を作成します。

**管理者グループを作成する**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) で IAM コンソールを開きます。

1. ナビゲーションペインで、**[Groups]** (グループ)、**[Create New Group]** (新しいグループの作成) の順に選択します。

1. [**Group Name**] にグループの名前 (例: **Administrators**) を入力し、[**Next Step**] を選択します。

1. ポリシーのリストで、**AdministratorAccess** ポリシーの横にあるチェックボックスを選択します。[**フィルタ**] メニューと [**検索**] ボックスを使用して、ポリシーのリストをフィルタリングできます。

1. [**次のステップ**]、[**グループの作成**] の順に選択します。

新しいグループは、[**Group Name**] の下に表示されます。

**自分用の IAM ユーザーを作成するには、管理者グループにユーザーを追加し、パスワードを作成します。**

1. ナビゲーションペインで **[Users]** (ユーザー)、**[Add user]** (ユーザーの追加) の順に選択します。

1. [**ユーザー名**] ボックスにユーザー名を入力します。

1. **プログラミングによるアクセス**と**AWS マネジメントコンソールへのアクセス**の両方を選択します。

1. **[Next: Permissions]** (次のステップ: 許可) を選択します。

1. [**管理者**] グループの横にあるチェックボックスを選択します。続いて、**[Next: Review (次へ: レビュー)]** を選択します。

1. **[ユーザーの作成]** を選択します。

**新しい IAM ユーザーとしてサインインするには**

1. からサインアウトします AWS マネジメントコンソール。

1. 次の URL 形式を使用してコンソールにサインインします。

   `https://aws_account_number.signin.aws.amazon.com/console/`

   *aws\$1account\$1number* は、ハイフンのない AWS アカウント ID です。たとえば、 AWS アカウント ID が 1234-5678-9012 の場合、*aws\$1account\$1number* を に置き換えます**123456789012**。アカウント番号を検索する方法については、*IAM ユーザーガイド*の[AWS 「アカウント ID とそのエイリアス](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html)」を参照してください。

1. 作成した IAM ユーザー名とパスワードを入力します。サインインすると、ナビゲーションバーに*your\$1user\$1name* @ *your\$1aws\$1account\$1id*が表示されます。

**注記**  
サインインページの URL に AWS アカウント ID を含めない場合は、アカウントエイリアスを作成できます。

**アカウントエイリアスを作成または削除するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションペインで、**ダッシュボード** を選択します。

1. IAM ユーザーのサインインリンクを探します。

1. エイリアスを作成するには、[**カスタマイズ**] を選択します。エイリアスの名前を入力し、[**はい、作成する**] を選択します。

1. エイリアスを削除するには、**カスタマイズ** を選択してから、**はい、作成する** を選択します。サインイン URL は、 AWS アカウント ID を使用して に戻ります。

アカウントエイリアスを作成した後、サインインするには、次の URL を使用します。

`https://your_account_alias.signin.aws.amazon.com/console/`

アカウントの IAM ユーザーのサインインリンクを確認するには、IAM コンソールを開き、ダッシュボードの [**IAM users sign-in link**] の下を確認します。

IAM の詳細については、以下を参照してください。
+ [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/)
+ [IAM の開始方法](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [IAM ユーザーガイド](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## 次のステップ
<a name="setting-up-next-step-2"></a>

[AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)

# AWS Command Line Interface (AWS CLI) のセットアップ
<a name="setup-awscli"></a>

このステップでは、Amazon Managed Service for Apache Flink で使用する をダウンロードして設定 AWS CLI します。

**注記**  
このガイドの使用開始実習では、操作を実行するために、アカウントの管理者の認証情報 (`adminuser`) を使用していることが前提となっています。

**注記**  
が既に AWS CLI インストールされている場合は、アップグレードして最新の機能を取得する必要がある場合があります。詳細については、「 *AWS Command Line Interface ユーザーガイド*[」の AWS 「 コマンドラインインターフェイスのインストール](https://docs.aws.amazon.com/cli/latest/userguide/installing.html)」を参照してください。のバージョンを確認するには AWS CLI、次のコマンドを実行します。  

```
aws --version
```
このチュートリアルの演習では、次の AWS CLI バージョン 以降が必要です。  

```
aws-cli/1.16.63
```

**を設定するには AWS CLI**

1.  AWS CLIをダウンロードして設定します。手順については、*「AWS Command Line Interface ユーザーガイド」*の次のトピックを参照してください。
   + [AWS Command Line Interfaceのインストール](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [AWS CLIの設定](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1.  AWS CLI 設定ファイルに管理者ユーザーの名前付きプロファイルを追加します。 AWS CLI コマンドを実行するときは、このプロファイルを使用します。名前付きプロファイルの詳細については、*AWS Command Line Interface ユーザーガイド*の「[名前付きプロファイル](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html)」を参照してください。

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   使用可能な AWS リージョンのリストについては、の[AWS 「リージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html)」を参照してください*Amazon Web Services 全般のリファレンス*。

1. コマンドプロンプトで以下のヘルプコマンドを入力して、セットアップを確認します。

   ```
   aws help
   ```

 AWS アカウントと を設定したら AWS CLI、次の演習を試すことができます。この演習では、サンプルアプリケーションを設定し、end-to-endのセットアップをテストします。

## 次のステップ
<a name="setting-up-next-step-3"></a>

[Apache Flink アプリケーション用 Managed Serviceを作成して実行する](get-started-exercise.md)

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
<a name="get-started-exercise"></a>

この演習では、データストリームをソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。

**Topics**
+ [

## 2 つの Amazon Kinesis Data Streams を作成する
](#get-started-exercise-1)
+ [

## 入力ストリームへのサンプルレコードの書き込み
](#get-started-exercise-2)
+ [

## Apache Flink Streaming Java Code のダウンロードと検証
](#get-started-exercise-5)
+ [

## アプリケーションコードのコンパイル
](#get-started-exercise-5.5)
+ [

## Apache Flink Streaming Java Code のアップロードしてください
](#get-started-exercise-6)
+ [

## Managed Service for Apache Flink アプリケーションを作成して実行する
](#get-started-exercise-7)

## 2 つの Amazon Kinesis Data Streams を作成する
<a name="get-started-exercise-1"></a>

この演習でAmazon Managed Service for Apache Flink を作成する前に、2 つの Kinesis データストリーム (`ExampleInputStream` と `ExampleOutputStream`) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールを使用した手順については、[データストリームの作成および更新](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)を参照してください。

**データストリームを作成するには (AWS CLI)**

1. 最初のストリーム (`ExampleInputStream`) を作成するには、次の Amazon Kinesis `create-stream` AWS CLI コマンドを使用します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## 入力ストリームへのサンプルレコードの書き込み
<a name="get-started-exercise-2"></a>

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

**注記**  
このセクションでは [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/) が必要です。

1. 次の内容で、`stock.py` という名前のファイルを作成します。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. このチュートリアルの後半では、アプリケーションにデータを送信する `stock.py` スクリプトを実行します。

   ```
   $ python stock.py
   ```

## Apache Flink Streaming Java Code のダウンロードと検証
<a name="get-started-exercise-5"></a>

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. `GettingStarted` ディレクトリに移動します。

アプリケーションコードは `CustomSinkStreamingJob.java` ファイルと `CloudWatchLogSink.java` ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。
+ アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## アプリケーションコードのコンパイル
<a name="get-started-exercise-5.5"></a>

このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、[演習を完了するための前提条件](tutorial-stock-data.md#setting-up-prerequisites)を参照してください。

Java アプリケーションには、次のコンポーネントが必要です。
+ [プロジェクトオブジェクトモデル (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html) ファイル。ファイルには、Amazon Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。
+ アプリケーションのロジックを含む `main` メソッド。

**注記**  
**次のアプリケーション用の Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、[Apache Flink ドキュメント](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html)で説明されているように構築する必要があります。**

**アプリケーションコードを作成してコンパイルするには**

1. Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。
   + [ 最初の Java プロジェクトの作成 (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ 最初の Java アプリケーションの作成、実行、およびパッケージング (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. `StreamingJob.java` という名前のファイルに対して次のコードを使用します。

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   前述のコード例については、以下の点に注意してください。
   + このファイルには、アプリケーションの機能を定義する `main` メソッドが含まれています。
   + アプリケーションでは、ソースおよびシンクコネクタを作成し、`StreamExecutionEnvironment` オブジェクトを使用して外部リソースにアクセスします。
   + アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、`createSourceFromApplicationProperties` および `createSinkFromApplicationProperties` メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。

1. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。
   + Maven コマンドラインツールを使用します。`pom.xml` ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。

     ```
     mvn package
     ```
   + 開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。

   パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。を使用してアプリケーションを作成する場合は AWS CLI、コードコンテンツタイプ (JAR または ZIP) を指定します。

1. コンパイル中にエラーが発生した場合は、`JAVA_HOME` 環境変数が正しく設定されていることを確認します。

アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

`target/java-getting-started-1.0.jar`

## Apache Flink Streaming Java Code のアップロードしてください
<a name="get-started-exercise-6"></a>

このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。

**アプリケーションコードをアップロードするには**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. **[バケットを作成]** を選択します。

1. [**Bucket name (バケット名)**] フィールドに**ka-app-code-*<username>***と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[**次へ**] を選択します。

1. **設定オプション**のステップでは、設定をそのままにし、[**次へ**] を選択します。

1. **アクセス許可の設定**のステップでは、設定をそのままにし、[**次へ**] を選択します。

1. **[バケットを作成]** を選択します。

1. Amazon S3 コンソールで **ka-app-code-*<username>*** バケットを選択し、[**アップロード**] を選択します。

1. **ファイルの選択**のステップで、[**ファイルを追加**] を選択します。前のステップで作成した `java-getting-started-1.0.jar` ファイルに移動します。[**次へ**] を選択します。

1. **アクセス許可の設定**のステップでは、設定をそのままにします。[**次へ**] を選択します。

1. **プロパティの設定**のステップでは、設定をそのままにします。**アップロード** を選択します。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

## Managed Service for Apache Flink アプリケーションを作成して実行する
<a name="get-started-exercise-7"></a>

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。

**注記**  
コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) リソースと Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

**Topics**
+ [

### アプリケーションを作成して実行する (コンソール)
](#get-started-exercise-7-console)
+ [

### アプリケーションを作成して実行する (AWS CLI)
](#get-started-exercise-7-cli)

### アプリケーションを作成して実行する (コンソール)
<a name="get-started-exercise-7-console"></a>

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

#### アプリケーションの作成
<a name="get-started-exercise-7-console-create"></a>

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. Amazon Kinesis ダッシュボードで、[**分析アプリケーションを作成する**] を選択します。

1. [**Kinesis Analytics - アプリケーションの作成**] ページで、次のようにアプリケーションの詳細を指定します。
   + [**アプリケーション名**] には**MyApplication**と入力します。
   + [**Description (説明)**] に**My java test app**と入力します。
   + [**ランタイム**] には、[**Apache Flink 1.6**] を選択します。

1. [**アクセス許可**] には、[**IAM ロールの作成 / 更新`kinesis-analytics-MyApplication-us-west-2`**] を選択します。

1. [**アプリケーションを作成**] を選択します。

**注記**  
コンソールを使用して Amazon Managed Service for Apache Flink を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-MyApplication-us-west-2`
ロール: `kinesis-analytics-MyApplication-us-west-2`

#### IAM ポリシーを編集する
<a name="get-started-exercise-7-console-iam"></a>

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-west-2`** ポリシーを選択します。

1. [**概要**] ページで、[**ポリシーの編集**] を選択します。**JSON** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (*012345678901*) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### アプリケーションを設定する
<a name="get-started-exercise-7-console-configure"></a>

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. [**Configure application**] ページで、[**Code location**] を次のように指定します。
   + [**Amazon S3 バケット**] で、**ka-app-code-*<username>***と入力します。
   + [**Amazon S3 オブジェクトへのパス**] で、**java-getting-started-1.0.jar**と入力します。

1. [**Access to application resources**] の [**Access permissions**] では、[**Create / update IAM role`kinesis-analytics-MyApplication-us-west-2`**] を選択します。

1. [**Properties**] の [**Group ID**] には、**ProducerConfigProperties**と入力します。

1. 次のアプリケーションのプロパティと値を入力します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/get-started-exercise.html)

1. [**Monitoring**] の [**Monitoring metrics level**] が [**Application**] に設定されていることを確認します。

1. [**CloudWatch logging**] では、[**Enable**] チェックボックスをオンにします。

1. **[更新]** を選択します。

**注記**  
CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`

#### アプリケーションを実行する
<a name="get-started-exercise-7-console-run"></a>

1. [**MyApplication**] ページで、[**Run**] を選択します。アクションを確認します。

1. アプリケーションが実行されたら、ページを更新します。コンソールには [**Application graph**] が示されます。

#### アプリケーションを停止する
<a name="get-started-exercise-7-console-stop"></a>

[**MyApplication**] ページで、[**Stop**] を選択します。アクションを確認します。

#### アプリケーションの更新
<a name="get-started-exercise-7-console-update"></a>

コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。

[**MyApplication**] ページで、[**Congirue**] を選択します。アプリケーションの設定を更新し、[**更新**] を選択します。

### アプリケーションを作成して実行する (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

このセクションでは、 AWS CLI を使用して Managed Service for Apache Flink アプリケーションを作成して実行します。Managed Service for Apache Flink は、 `kinesisanalyticsv2` AWS CLI コマンドを使用して Managed Service for Apache Flink アプリケーションを作成して操作します。

#### アクセス許可ポリシーを作成する
<a name="get-started-exercise-7-cli-policy"></a>

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの `read` アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの `write` アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。

次のコードを使用して `KAReadSourceStreamWriteSinkStream` アクセス許可ポリシーを作成します。`username` を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (`012345678901`) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

許可ポリシーを作成する詳しい手順については、*IAM ユーザーガイド*の[チュートリアル: はじめてのカスタマーマネージドポリシーの作成とアタッチ](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy)を参照してください。

**注記**  
他の AWS サービスにアクセスするには、 を使用できます AWS SDK for Java。Managed Service for Apache Flink は、アプリケーションに関連付けられているサービス実行 IAM ロールに、SDK が必要とする認証情報を自動的に設定します。追加の手順は必要ありません。

#### IAM ロールを作成する
<a name="get-started-exercise-7-cli-role"></a>

このセクションでは、Managed Service for Apache Flink がソースストリームを読み取り、シンクストリームに書き込むために想定できる IAM ロールを作成します。

Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

**IAM ロールを作成するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションペインで [**Roles (ロール)**]、[**Create Role (ロールの作成)**] の順に選択します。

1. [**信頼されるエンティティの種類を選択**] で、[**AWS のサービス**] を選択します。[**このロールを使用するサービスを選択**] で、[**Kinesis Analytics**] を選択します。[**ユースケースの選択**] で、[**Kinesis Analytics **] を選択します。

   **[Next: Permissions]** (次のステップ: 許可) を選択します。

1. [**アクセス権限ポリシーをアタッチする**] ページで、[**Next: Review**] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。

1. [**Create role (ロールの作成)**] ページで、**ロールの名前**に **KA-stream-rw-role** を入力します。[**ロールの作成**] を選択してください。

   これで、`KA-stream-rw-role` と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

1.  アクセス許可ポリシーをロールにアタッチします。
**注記**  
この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。このため、前のステップで作成したポリシー、[アクセス許可ポリシーを作成する](#get-started-exercise-7-cli-policy) をアタッチします。

   1. [**概要**] ページで、[**アクセス許可**] タブを選択します。

   1. [**Attach Policies (ポリシーのアタッチ)**] を選択します。

   1. 検索ボックスに**KAReadSourceStreamWriteSinkStream**(前のセクションで作成したポリシー) と入力します。

   1. [**KAReadInputStreamWriteOutputStream**] ポリシーを選択し、[**ポリシーのアタッチ**] を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

ロールを作成する手順については、*IAM ユーザーガイド*の [IAM ロールの作成 (コンソール)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console)を参照してください。

#### Apache Flink アプリケーション用 Managed Serviceの作成
<a name="get-started-exercise-7-cli-create"></a>

1. 次の JSON コードを `create_request.json` という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (`username`) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (`012345678901`) を、自分のアカウント ID に置き換えます。

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. 前述のリクエストを指定して [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) アクションを実行し、アプリケーションを作成します。

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

#### アプリケーションの起動
<a name="get-started-exercise-7-cli-start"></a>

このセクションでは、[https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) アクションを使用してアプリケーションを起動します。

**アプリケーションを起動するには**

1. 次の JSON コードを `start_request.json` という名前のファイルに保存します。

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. 前述のリクエストを指定して [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) アクションを実行し、アプリケーションを起動します。

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。

#### アプリケーションの停止
<a name="get-started-exercise-7-cli-stop"></a>

このセクションでは、[https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) アクションを使用してアプリケーションを停止します。

**アプリケーションを停止するには**

1. 次の JSON コードを `stop_request.json` という名前のファイルに保存します。

   ```
   {"ApplicationName": "test"
   }
   ```

1. 次のリクエストを指定して [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) アクションを実行し、アプリケーションを停止します。

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

アプリケーションが停止します。

# チュートリアル: Amazon Kinesis Data Streams AWS Lambda で を使用する
<a name="tutorial-stock-data-lambda"></a>

このチュートリアルでは、Kinesis データストリームのイベントを処理する Lambda 関数を作成します。このシナリオ例では、カスタムアプリケーションは Kinesis データストリームにレコードを書き込みます。 AWS Lambda はこのデータストリームをポーリングし、新しいデータレコードを検出すると、 は Lambda 関数を呼び出します。 AWS Lambda 次に、Lambda 関数の作成時に指定した実行ロールを引き受けて Lambda 関数を実行します。

詳細な手順については、[「チュートリアル: Amazon Kinesis での AWS Lambda の使用 Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html)」を参照してください。

**注記**  
このチュートリアルでは、基本的な Lambda オペレーションと AWS Lambda コンソールについてある程度の知識があることを前提としています。まだ作成していない場合は、[AWS 「Lambda の開始方法](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)」の手順に従って、最初の Lambda 関数を作成します。

# Amazon Kinesis AWS のストリーミングデータソリューションを使用する
<a name="examples-streaming-solution"></a>

Amazon Kinesis AWS 用ストリーミングデータソリューションは、ストリーミングデータを簡単にキャプチャ、保存、処理、配信するために必要な AWS サービスを自動的に設定します。このソリューションは、Kinesis Data Streams、Amazon API Gateway AWS Lambda、Amazon Managed Service for Apache Flink など、複数の AWS サービスを使用するストリーミングデータのユースケースを解決するための複数のオプションを提供します。

各ソリューションには、以下のコンポーネントとが含まれています。
+ 完全な例をデプロイする CloudFormation パッケージ。
+ アプリケーションのメトリクスを表示するための CloudWatch ダッシュボード。
+ CloudWatch は、最も関連性の高いアプリケーションメトリクスでアラームを発します。
+ すべての必要な IAM ロールとポリシー

ソリューションはこちらでご覧いただけます。[Amazon Kinesis 向けストリーミングデータソリューション](https://aws.amazon.com/solutions/implementations/aws-streaming-data-solution-for-amazon-kinesis/)