

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 例: ストリームでデータの異常を検出する (RANDOM\$1CUT\$1FOREST 関数)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics では、数値列の値に基づいて異常スコアを各レコードに割り当てる関数 (`RANDOM_CUT_FOREST`) を提供しています。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「[`RANDOM_CUT_FOREST` Function](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)」を参照してください。

この実習では、アプリケーションのストリーミングソースのレコードに異常スコアを割り当てるアプリケーションコードを作成します。アプリケーションをセットアップするには、以下を実行します。

1. **ストリーミングソースのセットアップ** – Kinesis データストリームをセットアップして、次のようにサンプル `heartRate` データを書き込みます。

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   この手順では、ストリームに入力するための Python スクリプトを提供しています。`heartRate` 値はランダムに生成されます。レコードの 99 パーセントは `heartRate` 値が 60 から 100 の間で、`heartRate` 値の 1 パーセントのみが 150 から 200 の間です。したがって、`heartRate` 値が 150 から 200 の間のレコードは異常です。

1. **入力の設定** – コンソールを使用して、Kinesis Data Analytics アプリケーションを作成し、ストリーミングソースをアプリケーション内ストリーム (`SOURCE_SQL_STREAM_001`) にマッピングすることでアプリケーション入力を設定します。アプリケーションが起動すると、Kinesis Data Analytics は継続的にストリーミングソースを読み取り、アプリケーション内ストリームにレコードを挿入します。

1. **アプリケーションコードの指定** – この例では、次のアプリケーションコードを使用します。

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   コードは `SOURCE_SQL_STREAM_001` の行を読み取り、異常スコアを割り当て、結果の行を別のアプリケーション内ストリーム (`TEMP_STREAM`) に書き込みます。次に、アプリケーションコードは`TEMP_STREAM` のレコードをソートして、結果を別のアプリケーション内ストリーム (`DESTINATION_SQL_STREAM`) に保存します。ポンプを使用して、アプリケーション内ストリームに行を挿入します。詳細については、「[アプリケーション内ストリームとポンプ](streams-pumps.md)」を参照してください。

1. **出力の設定** – `DESTINATION_SQL_STREAM` のデータを永続化して、別の Kinesis データストリームの外部宛先に書き込むようにアプリケーション出力を設定できます。各レコードに割り当てられた異常スコアを確認して、どのスコアが異常を示しているか (また、アラートが必要か) を調べるのは、アプリケーションの範囲外です。 AWS Lambda 関数を使用して、これらの異常スコアを処理し、アラートを設定できます。

この実習では、米国東部 (バージニア北部) (`us-east-1`) を使用して、これらのストリームとアプリケーションを作成します。他のリージョンも使用する場合は、それに応じてコードを更新する必要があります。

**Topics**
+ [ステップ 1: 準備](app-anomaly-prepare.md)
+ [ステップ 2: アプリケーションの作成](app-anom-score-create-app.md)
+ [ステップ 3: アプリケーション出力を設定する](app-anomaly-create-ka-app-config-destination.md)
+ [ステップ 4: 出力の確認](app-anomaly-verify-output.md)

**次のステップ**  
[ステップ 1: 準備](app-anomaly-prepare.md)

# ステップ 1: 準備
<a name="app-anomaly-prepare"></a>

この実習用の Amazon Kinesis Data Analytics アプリケーションを作成する前に、2 つの Kinesis データストリームを作成する必要があります。ストリームの 1 つはアプリケーションのストリーミングソースとして設定し、もう 1 つのストリームは Kinesis Data Analytics がアプリケーション出力を永続化する宛先として設定します。

**Topics**
+ [ステップ 1.1: 入力ストリームと出力データストリームを作成する](#app-anomaly-create-two-streams)
+ [ステップ 1.2: 入力ストリームにサンプルレコードを書き込みます](#app-anomaly-write-sample-records-inputstream)

## ステップ 1.1: 入力ストリームと出力データストリームを作成する
<a name="app-anomaly-create-two-streams"></a>

このセクションでは、2 つの Kinesis ストリーム (`ExampleInputStream` および `ExampleOutputStream`) を作成します。 AWS マネジメントコンソール または AWS CLIを使用してこれらのストリームを作成できます。
+ 

**コンソールを使用するには**

  1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

  1. [**データストリームの作成**] を選択します。`ExampleInputStream` という名前の 1 つのシャードがあるストリームを作成します。詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「[Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)」を参照してください。

  1. 前のステップを繰り返し、`ExampleOutputStream` という名前の 1 つのシャードを持つストリームを作成します。
+ 

**を使用するには AWS CLI**

  1. 次の Kinesis `create-stream` AWS CLI コマンドを使用して、最初のストリーム () を作成します`ExampleInputStream`。

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 同じコマンドを実行し、ストリーム名を `ExampleOutputStream` に変更します。このコマンドは、アプリケーションが出力の書き込みに使用する 2 つ目のストリームを作成します。

## ステップ 1.2: 入力ストリームにサンプルレコードを書き込みます
<a name="app-anomaly-write-sample-records-inputstream"></a>

このステップでは、Python コードを実行してサンプルレコードを連続生成し、それらのレコードを `ExampleInputStream` ストリームに書き込みます。

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. Python および `pip` をインストールします。

   Python のインストールについては、[Python](https://www.python.org/) ウェブサイトをご覧ください。

   pip を使用して依存関係をインストールできます。pip のインストールについては、pip ウェブサイトの「[Installation](https://pip.pypa.io/en/stable/installing/)」を参照してください。

1. 以下の Python コードを実行します。コードの `put-record` コマンドは、ストリームに JSON レコードを書き込みます。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**次のステップ**  
[ステップ 2: アプリケーションの作成](app-anom-score-create-app.md)

# ステップ 2: アプリケーションの作成
<a name="app-anom-score-create-app"></a>

このセクションでは、次のように Amazon Kinesis Data Analytics アプリケーションを作成します。
+ [ステップ 1: 準備](app-anomaly-prepare.md) で作成した Kinesis データストリームをストリーミングソースとして使用するように、アプリケーション入力を設定します。
+ コンソールで [**異常検出**] テンプレートを使用します。

**アプリケーションを作成するには**

1. Kinesis Data Analytics の**開始方法**の実習のステップ 1、2、および 3 (「[ステップ 3.1: アプリケーションの作成](get-started-create-app.md)」を参照) に従います。
   + ソース設定で、以下を実行します。
     + 前のセクションで作成したストリーミングソースを指定します。
     + コンソールがスキーマを推測した後、スキーマを編集し、`heartRate` 列の型を `INTEGER` に設定します。

       心拍値のほとんどは正常で、検出プロセスはこの列に `TINYINT` 型を割り当てることがほとんどです。ただし、低い割合で値が高い心拍数を示します。これらの高い値が `TINYINT` 型に合わない場合、Kinesis Data Analytics はそれらの列をエラーストリームに送ります。データ型を `INTEGER` に更新して、生成された心拍数データのすべてに対応できるようにします。
   + コンソールで [**異常検出**] テンプレートを使用します。次にテンプレートコードを更新して適切な列名を指定します。

1. 列名を指定してアプリケーションコードを更新します。その結果アプリケーションコードは次のようになります (このコードを SQL エディタに貼り付けます)。

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. SQL コードを実行して Kinesis Data Analytics コンソールで結果を確認します。  
![\[アプリケーション内ストリームに結果のデータを含んだリアルタイム分析タブを表示するコンソールのスクリーンショットです。\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**次のステップ**  
[ステップ 3: アプリケーション出力を設定する](app-anomaly-create-ka-app-config-destination.md)

# ステップ 3: アプリケーション出力を設定する
<a name="app-anomaly-create-ka-app-config-destination"></a>

「[ステップ 2: アプリケーションの作成](app-anom-score-create-app.md)」を完了すると、ストリーミングソースから心拍数データを読み取って、異常データをそれぞれに割り当てるアプリケーションコードが作成されます。

これで、アプリケーションの結果をアプリケーション内ストリームから外部宛先である別のデータストリーム (`OutputStreamTestingAnomalyScores`) に送信できます。異常スコアを分析し、どの心拍数が異常であるか判断できます。次に、このアプリケーションを拡張してアラートを生成できます。

以下のステップに従って、アプリケーション出力を設定します。



1. Amazon Kinesis Data Analytics コンソールを開きます。SQL エディタのアプリケーションダッシュボードで、[**Destination**] または [**Add a destination**] を選択します。

1. [**送信先への接続**] ページで、前のセクションで作成した `OutputStreamTestingAnomalyScores` ストリームを選択します。

   こうしてできた外部宛先に、アプリケーションがアプリケーション内ストリーム `DESTINATION_SQL_STREAM` に書き込むレコードを Amazon Kinesis Data Analytics が永続化できます。

1. オプションで`OutputStreamTestingAnomalyScores`、ストリームをモニタリングしてアラートを送信する AWS Lambda ように を設定できます。手順については、「[Lambda 関数を使用したデータの事前処理](lambda-preprocessing.md)」を参照してください。アラートを設定しない場合は、`OutputStreamTestingAnomalyScores` が外部宛先に書き込むレコードを確認できます。[ステップ 4: 出力の確認](app-anomaly-verify-output.md) で説明する Kinesis データストリームがこれにあたります。

**次のステップ**  
[ステップ 4: 出力の確認](app-anomaly-verify-output.md)

# ステップ 4: 出力の確認
<a name="app-anomaly-verify-output"></a>

「[ステップ 3: アプリケーション出力を設定する](app-anomaly-create-ka-app-config-destination.md)」でアプリケーション出力を設定した後、次の AWS CLI コマンドを使用して、アプリケーションによって書き込まれた宛先ストリームのレコードを読み取ります。

1. `get-shard-iterator` コマンドを実行して出力ストリームのデータへのポインタを取得します。

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   次のレスポンス例に示すように、シャードイテレーター値を含むレスポンスを受け取ります。

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   シャードイテレーター値をコピーします。

1.  AWS CLI `get-records` コマンドを実行します。

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   コマンドはレコードのページと、別のシャードイテレーターを返します。これは後続の `get-records` コマンドで次のレコードのセットを取得するために使用できます。