

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 示例：在流中检测数据异常情况 (RANDOM\_CUT\_FOREST 函数)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics 提供了一个函数 (`RANDOM_CUT_FOREST`)，它可以根据数值列中的值将异常分数分配给每个记录。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[`RANDOM_CUT_FOREST` 函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)。

在本练习中，您将编写应用程序代码以将异常分数分配给应用程序的流式传输源中的记录。要设置应用程序，请执行以下操作：

1. **设置流式源** - 您设置 Kinesis 数据流并编写示例 `heartRate` 数据，如下所示：

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   此过程提供用于填充流的 Python 脚本。`heartRate` 值将随机生成，99% 的记录具有的 `heartRate` 值介于 60 和 100 之间，仅 1% 的记录具有的 `heartRate` 值介于 150 和 200 之间。因此，`heartRate` 值介于 150 和 200 之间的记录是异常情况。

1. **配置输入** - 通过使用控制台，您创建 Kinesis Data Analytics 应用程序，并通过将流式源映射到应用程序内部流 (`SOURCE_SQL_STREAM_001`) 来配置应用程序输入。在应用程序启动时，Kinesis Data Analytics 持续读取流式传输源，并将记录插入到应用程序内部流中。

1. **指定应用程序代码** - 示例使用以下应用程序代码：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   此代码读取 `SOURCE_SQL_STREAM_001` 中的行，分配异常分数，并将结果行写入另一个应用程序内部流 (`TEMP_STREAM`)。随后，应用程序代码将对 `TEMP_STREAM` 中的记录进行排序，并将结果保存到另一个应用程序内部流 (`DESTINATION_SQL_STREAM`)。您使用数据泵将流插入到应用程序内部流。有关更多信息，请参阅 [In-Application 直播和泵](streams-pumps.md)。

1. **配置输出** - 您配置应用程序输出以将 `DESTINATION_SQL_STREAM` 中的数据保存到外部目标 (另一个 Kinesis 数据流)。查看分配给每条记录的异常分数并确定哪个分数指示应用程序外部的异常情况 (您需要收到这些异常情况的警报)。您可以使用 AWS Lambda 函数来处理这些异常分数并配置警报。

此练习使用美国东部 (弗吉尼亚州北部) (`us-east-1`) 来创建这些流和您的应用程序。如果您使用任何其他区域，则必须相应地更新代码。

**Topics**
+ [步骤 1：准备](app-anomaly-prepare.md)
+ [步骤 2：创建应用程序](app-anom-score-create-app.md)
+ [步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)
+ [步骤 4：验证输出](app-anomaly-verify-output.md)

**下一个步骤**  
[步骤 1：准备](app-anomaly-prepare.md)