

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 範例：偵測串流上的熱點 (熱點功能)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics 提供的 `HOTSPOTS` 功能可以找出並傳回資料中相對密集區域的相關資訊。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[熱點](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html)。

在本練習中撰寫應用程式碼，以便在應用程式的串流來源上尋找熱點。若要設定應用程式，請執行下列動作：

1. **設定串流來源**：設定 Kinesis 串流並寫入範例座標資料，如下所示：

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   此範例提供 Python 指令碼供您填入串流。`x` 和 `y` 值是隨機產生的，有些記錄會叢集在某些位置。

   如果指令碼刻意生成數值做為熱點的一部分，則會提供 `is_hot` 欄位做為指標。這可協助您評估熱點偵測功能是否正常運作。

1. **建立應用程式**：接著使用 AWS 管理主控台建立 Kinesis Data Analytics 應用程式。將串流來源對應至應用程式內串流 (`SOURCE_SQL_STREAM_001`)，以設定應用程式輸入。當應用程式啟動時，Kinesis Data Analytics 會持續讀取串流來源，並將記錄插入應用程式內串流。

   在本練習中，針對應用程式使用下列程式碼：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   此程式碼會讀取 `SOURCE_SQL_STREAM_001` 中的資料列、分析找出顯著熱點，然後將產生的資料寫入另一個應用程式內串流 (`DESTINATION_SQL_STREAM`)。您可以使用幫浦將資料列插入應用程式內串流。如需詳細資訊，請參閱[應用程式內串流與幫浦](streams-pumps.md)。

1. **設定輸出**：設定應用程式輸出，將資料從應用程式傳送至外部目的地，即另一個 Kinesis 資料串流。查看熱點分數，並判斷哪些分數表示出現了熱點（且需要收到提醒）。您可以使用 AWS Lambda 函數進一步處理熱點資訊並設定提醒。

1. **驗證輸出**：此範例包含一個 JavaScript 應用程式，可從輸出串流讀取資料並以圖形方式顯示，以便您即時檢視應用程式產生的熱點。



本練習會使用美國西部 (奧勒岡) (`us-west-2`) 來建立這些串流及應用程式。如果您使用任何其他區域，請更新相應程式碼。

**Topics**
+ [步驟 1：建立輸入和輸出串流](app-hotspots-prepare.md)
+ [步驟 2：建立 Amazon Kinesis Data Analytics 應用程式](app-hotspot-create-app.md)
+ [步驟 3：設定應用程式輸出](app-hotspots-create-ka-app-config-destination.md)
+ [步驟 4：驗證應用程式輸出](app-hotspots-verify-output.md)

# 步驟 1：建立輸入和輸出串流
<a name="app-hotspots-prepare"></a>

為[熱點範例](app-hotspots-detection.md)建立 Amazon Kinesis Data Analytics 應用程式之前，您必須建立兩個 Kinesis 資料串流。將其中一個串流設定為應用程式的串流來源，另一個串流設定為 Kinesis Data Analytics 保留應用程式輸出的目的地。

**Topics**
+ [步驟 1.1：建立 Kinesis 資料串流](#app-hotspots-create-two-streams)
+ [步驟 1.2：將範例記錄寫入輸入串流](#app-hotspots-write-sample-records-inputstream)

## 步驟 1.1：建立 Kinesis 資料串流
<a name="app-hotspots-create-two-streams"></a>

在本節中，建立兩個 Kinesis 資料串流：`ExampleInputStream` 和 `ExampleOutputStream`。

使用主控台或 AWS CLI建立資料串流。
+ 如要使用主控台建立資料串流：

  1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

  1. 在導覽窗格中選擇**資料串流**。

  1. 選擇**建立 Kinesis 串流**，然後建立內含一個名為 `ExampleInputStream` 的碎片之串流。

  1. 重複上一個步驟，以名為 `ExampleOutputStream` 的碎片建立串流。
+ 使用 AWS CLI建立資料串流：
  + 使用下列 Kinesis `create-stream` AWS CLI 命令建立串流 (`ExampleInputStream` 和 `ExampleOutputStream`)。若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，並將串流名稱變更為 `ExampleOutputStream`。

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## 步驟 1.2：將範例記錄寫入輸入串流
<a name="app-hotspots-write-sample-records-inputstream"></a>

在此步驟中，執行 Python 程式碼以持續產生範例記錄，並將這些記錄寫入 `ExampleInputStream` 串流。

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. 安裝 Python 與 `pip`。

   如需安裝 Python 的相關資訊，請參閱 [Python](https://www.python.org/) 網站。

   您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊，請參閱 Pip 網站的[安裝](https://pip.pypa.io/en/stable/installing/)。

1. 執行以下 Python 程式碼。該程式碼會執行下列作業：
   + 在 (X, Y) 平面中的某個位置生成潛在熱點。
   + 為每個熱點產生一組 1,000 個點。在這些點中，20% 會叢集在熱點周圍。其餘部分會在整個空間內隨機產生。
   + `put-record` 命令會將 JSON 記錄寫入串流。
**重要**  
請勿將此檔案上傳到 Web 伺服器，因為它包含您的 AWS 憑證。

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**後續步驟**  
[步驟 2：建立 Amazon Kinesis Data Analytics 應用程式](app-hotspot-create-app.md)

# 步驟 2：建立 Amazon Kinesis Data Analytics 應用程式
<a name="app-hotspot-create-app"></a>

在[熱點範例](app-hotspots-detection.md)的本節中，建立 Amazon Kinesis Data Analytics 應用程式，如下所示：
+ 設定應用程式輸入，以使用您在[步驟 1](app-hotspots-prepare.md) 中建立的 Kinesis 資料串流作為串流來源。
+ 使用 AWS 管理主控台中提供的應用程式碼。

**建立應用程式**

1. 按照[入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中的步驟 1、2 和 3 建立 Kinesis Data Analytics 應用程式 (請參閱 [步驟 3.1：建立應用程式](get-started-create-app.md))。

   在來源設定中，執行下列動作：
   + 指定您在 [步驟 1：建立輸入和輸出串流](app-hotspots-prepare.md) 中建立的串流來源。
   + 主控台推斷結構描述後，請編輯結構描述。請確定 `x` 和 `y` 欄類型已設定為 `DOUBLE`，且 `IS_HOT` 欄類型設定為 `VARCHAR`。

1. 使用以下應用程式碼 (您可將此代碼貼到 SQL 編輯器中)：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. 執行 SQL 程式碼並檢閱結果。  
![\[顯示列時間，熱點和 hotspot_heat 的 SQL 代碼結果。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**後續步驟**  
[步驟 3：設定應用程式輸出](app-hotspots-create-ka-app-config-destination.md)

# 步驟 3：設定應用程式輸出
<a name="app-hotspots-create-ka-app-config-destination"></a>

在[熱點範例](app-hotspots-detection.md)中，可用 Amazon Kinesis Data Analytics 應用程式碼探索串流來源的重要熱點，並為每個熱點指派熱度分數。

您現在可以將應用程式結果從應用程式內串流傳送到外部目的地，即另一個 Kinesis 資料串流 (`ExampleOutputStream`)。然後，您可以分析熱點分數，並確定熱點熱度的適當閾值。您可以進一步擴充此應用程式以產生提醒。

**如要設定應用程式輸出**

1. 在 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesisanalytics) 上開啟 Kinesis Data Analytics 主控台。

1. 在 SQL 編輯器中，選擇應用程式儀表板中的**目的地**或**新增目的地**。

1. 在**新增目的地**頁面，選擇**從串流中選取**。然後，選擇您在上一節建立的 `ExampleOutputStream` 串流。

   現在您有一個外部目的地，可讓 Amazon Kinesis Data Analytics 將應用程式寫入的任何紀錄保留在應用程式內 `DESTINATION_SQL_STREAM` 串流中。

1. 您可以選擇設定 `ExampleOutputStream` AWS Lambda 來監控串流並傳送提醒給您。如需詳細資訊，請參閱[使用 Lambda 函數作為輸出](how-it-works-output-lambda.md)。您也可以檢閱 Kinesis Data Analytics 寫入外部目的地 (Kinesis 串流 `ExampleOutputStream`) 的記錄，如 [步驟 4：驗證應用程式輸出](app-hotspots-verify-output.md) 中所述。

**後續步驟**  
[步驟 4：驗證應用程式輸出](app-hotspots-verify-output.md)

# 步驟 4：驗證應用程式輸出
<a name="app-hotspots-verify-output"></a>

在[熱點範例](app-hotspots-detection.md)的這節中，設定 Web 應用程式，以可擴展向量圖形 (SVG) 控制顯示熱點資訊。

1. 使用下列內容建立名為 `index.html` 的檔案：

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. 在同一目錄中，建立名為 `hotspots_viewer.js` 的檔案，內含下列內容：在提供的變數中提供您的 、憑證和輸出串流名稱。

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. 在執行第一部分的 Python 程式碼之後，在網頁瀏覽器中開啟 `index.html`。熱點資訊會顯示在頁面上，如下所示。

     
![\[顯示熱點資訊的可擴展向量圖形圖。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)