

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 例 : ストリーム上のホットスポットの検出 (HOTSPOTS 関数)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics は、データの相対的に高密度なリージョンを検索してその情報を返す `HOTSPOTS` 関数を提供しています。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「[HOTSPOTS](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html)」を参照してください。

この実習では、アプリケーションのストリーミングソースのホットスポットを見つけるアプリケーションコードを作成します。アプリケーションをセットアップするには、以下のステップを実行します。

1. **ストリーミングソースのセットアップ** – Kinesis ストリームをセットアップして、次のようにサンプル座標データを書き込みます。

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   この例では、ストリームに入力するための Python スクリプトを提供しています。`x` および `y` 値はランダムに生成され、一部のレコードは特定の場所の周りにクラスター化されます。

   スクリプトがホットスポットの一部として意図的に値を生成した場合、`is_hot` フィールドはインジケータとして提供されます。これは、ホットスポット検出関数が正常に動作しているかどうかを評価するのに役立ちます。

1. **アプリケーションの作成** – AWS マネジメントコンソールを使用して Kinesis Data Analytics アプリケーションを作成します。ストリーミングソースをアプリケーション内ストリーム (`SOURCE_SQL_STREAM_001`) にマッピングして、アプリケーション入力を設定します。アプリケーションが起動すると、Kinesis Data Analytics は継続的にストリーミングソースを読み取り、アプリケーション内ストリームにレコードを挿入します。

   この演習では、アプリケーションに次のコードを使用します。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   コードは `SOURCE_SQL_STREAM_001` の行を読み取り、重要なホットスポットを分析し、結果のデータを別のアプリケーション内ストリーム (`DESTINATION_SQL_STREAM`) に書き込みます。ポンプを使用して、アプリケーション内ストリームに行を挿入します。詳細については、「[アプリケーション内ストリームとポンプ](streams-pumps.md)」を参照してください。

1. **出力の設定** – アプリケーションから別の Kinesis データストリームである外部送信先にデータを送信するように、アプリケーション出力を設定します。ホットスポットのスコアを確認し、ホットスポットが発生したことを示すスコア (およびアラートが必要なスコア) を判断します。 AWS Lambda 関数を使用して、ホットスポット情報をさらに処理し、アラートを設定できます。

1. **出力を確認する** – この例には、出力ストリームからデータを読み込んでグラフィカルに表示する JavaScript アプリケーションが含まれているので、アプリケーションが生成するホットスポットをリアルタイムで表示できます。



この実習では、米国西部 (オレゴン) (`us-west-2`) を使用して、これらのストリームとアプリケーションを作成します。他のリージョンも使用する場合は、それに応じてコードを更新してください。

**Topics**
+ [ステップ 1: 入力ストリームと出力ストリームを作成する](app-hotspots-prepare.md)
+ [ステップ 2: Kinesis Data Analytics アプリケーションを作成する](app-hotspot-create-app.md)
+ [ステップ 3: アプリケーション出力を設定する](app-hotspots-create-ka-app-config-destination.md)
+ [ステップ 4: アプリケーション出力を検証する](app-hotspots-verify-output.md)

# ステップ 1: 入力ストリームと出力ストリームを作成する
<a name="app-hotspots-prepare"></a>

[ホットスポット例](app-hotspots-detection.md)用の Amazon Kinesis Data Analytics アプリケーションを作成する前に、2 つの Kinesis データストリームを作成する必要があります。ストリームの 1 つはアプリケーションのストリーミングソースとして設定し、もう 1 つのストリームは Kinesis Data Analytics がアプリケーション出力を永続化する宛先として設定します。

**Topics**
+ [ステップ 1.1: Kinesis データストリームを作成する](#app-hotspots-create-two-streams)
+ [ステップ 1.2: 入力ストリームにサンプルレコードを書き込みます](#app-hotspots-write-sample-records-inputstream)

## ステップ 1.1: Kinesis データストリームを作成する
<a name="app-hotspots-create-two-streams"></a>

このセクションでは、2 つの Kinesis データストリーム (`ExampleInputStream` および `ExampleOutputStream`) を作成します。

コンソールまたは AWS CLIを使用してこれらのデータストリームを作成します。
+ コンソールを使用してデータストリームを作成するには

  1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

  1. ナビゲーションペインで、[**データストリーム**] を選択します。

  1. [**Kinesis ストリームの作成**] を選択し、`ExampleInputStream` という名前の 1 つのシャードを持つストリームを作成します。

  1. 前のステップを繰り返し、`ExampleOutputStream` という名前の 1 つのシャードを持つストリームを作成します。
+  AWS CLIを使用してデータストリームを作成するには
  + 次の Kinesis `create-stream` AWS CLI コマンドを使用してストリーム (`ExampleInputStream` および `ExampleOutputStream`) を作成します。アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## ステップ 1.2: 入力ストリームにサンプルレコードを書き込みます
<a name="app-hotspots-write-sample-records-inputstream"></a>

このステップでは、Python コードを実行してサンプルレコードを連続生成し、`ExampleInputStream` ストリームに書き込みます。

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. Python および `pip` をインストールします。

   Python のインストールについては、[Python](https://www.python.org/) ウェブサイトをご覧ください。

   pip を使用して依存関係をインストールできます。pip のインストールについては、pip ウェブサイトの「[Installation](https://pip.pypa.io/en/stable/installing/)」を参照してください。

1. 以下の Python コードを実行します。このコードは以下の処理を実行します。
   + 潜在的なホットスポットを (X、Y) 平面のどこかに生成します。
   + ホットスポットごとに 1000 ポイントのセットを生成します。これらのポイントのうち、20 パーセントがホットスポットの周囲にクラスター化されています。残りはスペース全体でランダムに生成されます。
   + `put-record` コマンドは、ストリームに JSON レコードを書き込みます。
**重要**  
このファイルには AWS 認証情報が含まれているため、このファイルをウェブサーバーにアップロードしないでください。

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**次のステップ**  
[ステップ 2: Kinesis Data Analytics アプリケーションを作成する](app-hotspot-create-app.md)

# ステップ 2: Kinesis Data Analytics アプリケーションを作成する
<a name="app-hotspot-create-app"></a>

[ホットスポット例](app-hotspots-detection.md)のこのセクションでは、Kinesis Data Analytics アプリケーションを次のように作成します。
+ [ステップ 1](app-hotspots-prepare.md) で作成した Kinesis データストリームをストリーミングソースとして使用するように、アプリケーション入力を設定します。
+ 提供されているアプリケーションコードを AWS マネジメントコンソールで使用します。

**アプリケーションを作成するには**

1. 「[使用開始](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)」実習のステップ 1、2、および 3 (「[ステップ 3.1: アプリケーションの作成](get-started-create-app.md)」を参照) に従って Kinesis Data Analytics アプリケーションを作成します。

   ソース設定で、以下を実行します。
   + 作成したストリーミングソースを、[ステップ 1: 入力ストリームと出力ストリームを作成する](app-hotspots-prepare.md) で指定します。
   + コンソールがスキーマを推測した後、スキーマを編集します。`x` 列および `y` 列のタイプが `DOUBLE` に設定され、`IS_HOT` 列のタイプが `VARCHAR` に設定されていることを確認します。

1. 次のアプリケーションコードを使用します (このコードを SQL エディタに貼り付けることができます)。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. SQL コードを実行して結果を確認します。  
![\[rowtime、hotspot、hotspot_heat が表示された SQL コードの結果\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**次のステップ**  
[ステップ 3: アプリケーション出力を設定する](app-hotspots-create-ka-app-config-destination.md)

# ステップ 3: アプリケーション出力を設定する
<a name="app-hotspots-create-ka-app-config-destination"></a>

[ホットスポット例](app-hotspots-detection.md)のこの時点で、Amazon Kinesis Data Analytics アプリケーションコードでストリーミングソースから重要なホットスポットを検出し、それぞれにヒートスコアを割り当てることができます。

これで、アプリケーションの結果をアプリケーション内ストリームから外部宛先である別の Kinesis データストリーム (`ExampleOutputStream`) に送信できます。その後、ホットスポットのスコアを分析して、ホットスポットのヒートに適したしきい値を決定することができます。このアプリケーションを拡張してアラートを生成できます。

**アプリケーション出力を設定するには**

1. Kinesis Data Analytics コンソール ([https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)) を開きます。

1. SQL エディタのアプリケーションダッシュボードで、[**Destination**] または [**Add a destination**] を選択します。

1. [**送信先の追加**] ページで、[**ストリームから選択**] を選択します。前のセクションで作成した `ExampleOutputStream` ストリームを選択します。

   こうしてできた外部宛先に、アプリケーションがアプリケーション内ストリーム `DESTINATION_SQL_STREAM` に書き込むレコードを Amazon Kinesis Data Analytics が永続化できます。

1. オプションで`ExampleOutputStream`、ストリームをモニタリングしてアラートを送信する AWS Lambda ように を設定できます。詳細については、「[出力としての Lambda 関数の使用](how-it-works-output-lambda.md)」を参照してください。[ステップ 4: アプリケーション出力を検証する](app-hotspots-verify-output.md) で説明するように、`ExampleOutputStream` が外部宛先である Kinesis ストリームに書き込んだレコードを確認することもできます。

**次のステップ**  
[ステップ 4: アプリケーション出力を検証する](app-hotspots-verify-output.md)

# ステップ 4: アプリケーション出力を検証する
<a name="app-hotspots-verify-output"></a>

[ ホットスポット例](app-hotspots-detection.md)のこのセクションで、ホットスポット情報を Scalable Vector Graphics (SVG) コントロールに表示するウェブアプリケーションを設定します。

1. 次の内容で、`index.html` という名前のファイルを作成します。

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. 次の内容の `hotspots_viewer.js` というファイルを、同じディレクトリに作成します。認証情報、および出力ストリーム名を指定された変数に設定します。

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. 最初のセクションの Python コードを実行し、ウェブブラウザで `index.html` を開きます。ホットスポット情報がページに次のように表示されます。

     
![\[ホットスポット情報を表示するスケーラブルなベクトルグラフィックスの図。\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)