

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 步骤 1：创建输入和输出流
<a name="app-hotspots-prepare"></a>

在为[热点示例](app-hotspots-detection.md)创建 Amazon Kinesis Data Analytics 应用程序之前，您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源，并将另一个流配置为目标（Kinesis Data Analytics 在其中永久保存应用程序输出）。

**Topics**
+ [步骤 1.1：创建 Kinesis 数据流](#app-hotspots-create-two-streams)
+ [步骤 1.2：将示例记录写入输入流](#app-hotspots-write-sample-records-inputstream)

## 步骤 1.1：创建 Kinesis 数据流
<a name="app-hotspots-create-two-streams"></a>

在此部分中，您创建两个 Kinesis 数据流：`ExampleInputStream` 和 `ExampleOutputStream`。

使用控制台或 AWS CLI创建这些数据流。
+ 使用控制台创建数据流：

  1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

  1. 在导航窗格中，选择 **数据流**。

  1. 选择**创建 Kinesis 流**，然后创建带有一个名为 `ExampleInputStream` 的分片的流。

  1. 重复上一步骤以创建带有一个名为 `ExampleOutputStream` 的分片的流。
+ 要使用 AWS CLI创建数据流，请执行以下操作：
  + 使用以下 Kinesis `create-stream` AWS CLI 命令创建直播（`ExampleInputStream`和`ExampleOutputStream`）。要创建另一个流 (应用程序将用于写入输出)，请运行同一命令以将流名称更改为 `ExampleOutputStream`。

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## 步骤 1.2：将示例记录写入输入流
<a name="app-hotspots-write-sample-records-inputstream"></a>

在此步骤中，您运行 Python 代码以持续生成示例记录并将其写入 `ExampleInputStream` 流。

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. 安装 Python 和 `pip`。

   有关安装 Python 的信息，请访问 [Python](https://www.python.org/) 网站。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 网站上的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。此代码将执行以下操作：
   + 在 (X, Y) 平面上的某个位置生成潜在热点。
   + 为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。
   + `put-record` 命令将 JSON 记录写入到流。
**重要**  
请勿将此文件上传到 Web 服务器，因为它包含您的 AWS 凭证。

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**下一个步骤**  
[步骤 2：创建 Kinesis Data Analytics 应用程序](app-hotspot-create-app.md)