

终止支持通知：2026 年 5 月 20 日， AWS 将终止对的支持。 AWS IoT Events 2026 年 5 月 20 日之后，您将无法再访问 AWS IoT Events 控制台或 AWS IoT Events 资源。有关更多信息，请参阅[AWS IoT Events 终止支持](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# AWS IoT Events 支持终止
<a name="iotevents-end-of-support"></a>

经过深思熟虑，我们决定终止对该 AWS IoT Events 服务的支持，自 2026 年 5 月 20 日起生效。 AWS IoT Events 从 2025 年 5 月 20 日起，将不再接受新客户。作为拥有账户在 2025 年 5 月 20 日之前注册该服务的现有客户，您可以继续使用这些 AWS IoT Events 功能。2026 年 5 月 20 日之后，您将无法再使用 AWS IoT Events。

本页为 AWS IoT Events 客户提供了过渡到替代解决方案以满足您的业务需求的说明和注意事项。

**注意**  
这些指南中介绍的解决方案旨在作为说明性示例，而不是用于生产的功能替代品。 AWS IoT Events 根据您的业务需求自定义代码、工作流程和相关 AWS 资源。

**Topics**
+ [迁出时的注意事项 AWS IoT Events](#eos-considerations)
+ [中探测器模型的迁移程序 AWS IoT Events](eos-procedure-detector-models.md)
+ [中 AWS IoT SiteWise 警报的迁移程序 AWS IoT Events](eos-procedure-alarms.md)

## 迁出时的注意事项 AWS IoT Events
<a name="eos-considerations"></a>
+ 实施安全最佳实践，包括使用对每个组件具有最低权限的 IAM 角色以及对静态和传输中的数据进行加密。有关更多信息，请参阅《[IAM 用户指南](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html)》中的 *IAM 安全最佳实践*。
+ 根据您的数据摄取要求考虑 Kinesis 流的分片数量。有关 Kinesis 分片的更多信息，请参阅[亚马逊 Kinesis Data Streams 开发者指南中的亚马逊 Kinesis Dat *a Stre* ams 术语和](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html)概念。
+ 使用指标和日志设置全面 CloudWatch 的监控和调试。有关更多信息，请参阅[什么是 CloudWatch？](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) 在《*亚马逊 CloudWatch 用户指南》*中。
+ 考虑一下错误处理的结构，包括如何管理重复处理失败的消息、实施重试策略以及如何设置隔离和分析有问题的消息的流程。
+ 使用定[AWS 价计算器](https://calculator.aws)估算特定用例的成本。

# 中探测器模型的迁移程序 AWS IoT Events
<a name="eos-procedure-detector-models"></a>

本节介绍替代解决方案，这些解决方案可在您迁移到其他探测器模型时提供类似的探测器模型功能 AWS IoT Events。

您可以通过 AWS IoT Core 规则将数据摄取迁移到其他 AWS 服务的组合。可以将数据路由到 MQT [BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html)T 主题，而不是通过 API 获取数据。 AWS IoT Core 

这种迁移方法利用 AWS IoT Core MQTT 主题作为物联网数据的入口点，取代了直接输入。 AWS IoT Events选择 MQTT 主题有几个关键原因。由于 MQTT 在行业中的广泛使用，它们与物联网设备具有广泛的兼容性。这些主题可以处理来自众多设备的大量消息，从而确保可扩展性。它们还允许根据内容或设备类型灵活地路由和筛选消息。此外， AWS IoT Core MQTT 主题与其他 AWS 服务无缝集成，从而简化了迁移过程。

数据从 MQTT 主题流入一个架构，该架构结合了 Amazon Kinesis Data Streams、 AWS Lambda 一个函数、一个亚马逊 DynamoDB 表和亚马逊计划。 EventBridge 这种服务组合复制并增强了以前提供的功能 AWS IoT Events，使您可以更加灵活地控制物联网数据处理管道。

## 比较架构
<a name="eos-architecture-comparison-detector-model"></a>

当前 AWS IoT Events 架构通过 AWS IoT Core 规则和 `BatchPutMessage` API 提取数据。该架构 AWS IoT Core 用于数据摄取和事件发布，消息通过 AWS IoT Events 输入路由到定义状态逻辑的探测器模型。IAM 角色管理必要的权限。

新的解决方案 AWS IoT Core 用于数据摄取（现在有专门的输入和输出 MQTT 主题）。它引入了用于数据分区的 Kinesis Data Streams 和用于状态逻辑的评估器 Lambda 函数。设备状态现在存储在 DynamoDB 表中，增强的 IAM 角色管理这些服务的权限。


| 用途 | 解决方案 | 差异 | 
| --- | --- | --- | 
|  **数据摄取** — 从物联网设备接收数据  |  AWS IoT Core  |  现在需要两个不同的 MQTT 主题：一个用于摄取设备数据，另一个用于发布输出事件  | 
|  **消息方向**-将传入的消息路由到相应的服务  |  AWS IoT Core 邮件路由规则  |  保持相同的路由功能，但现在将消息定向到 Kinesis Data Streams 而不是 AWS IoT Events  | 
|  **数据处理**-处理和组织传入的数据流  |  Kinesis Data Streams  |  取代 AWS IoT Events 输入功能，使用设备 ID 分区提供数据摄取，用于消息处理  | 
|  **逻辑评估**-处理状态变化并触发操作  |  评估者 Lambda  |  取代 AWS IoT Events 探测器模型，通过代码而不是可视化工作流程提供可自定义的状态逻辑评估  | 
|  **状态管理**-维护设备状态  |  DynamoDB 表  |  提供设备状态永久存储的新组件，取代了内部 AWS IoT Events 状态管理  | 
|  **安全**-管理服务权限  |  IAM 角色  |  更新后的权限现在包括访问 Kinesis Data Streams、DynamoDB 以及 EventBridge 现有权限 AWS IoT Core   | 

## 步骤 1：（可选）导出 AWS IoT Events 探测器型号配置
<a name="eos-detector-model-export-events-data"></a>

在创建新资源之前，请导出您的 AWS IoT Events 探测器模型定义。它们包含您的事件处理逻辑，可以作为实施新解决方案的历史参考。

------
#### [ Console ]

使用执行以下步骤以导出您的探测器模型配置： AWS IoT Events AWS 管理控制台

**要使用导出探测器模型 AWS 管理控制台**

1. 登录 [AWS IoT Events 控制台](https://console.aws.amazon.com/iotevents/)。

1. 在左侧导航窗格中，选择**探测器模型**。

1. 选择要导出的探测器型号。

1. 选择**导出**。阅读有关输出的信息消息，然后再次选择 “**导**出”。

1. 对要导出的每个探测器模型重复该过程。

包含探测器模型的 JSON 输出的文件已添加到浏览器的下载文件夹中。您可以选择保存每个探测器模型配置以保留历史数据。

------
#### [ AWS CLI ]

使用运行以下命令导出您的探测器模型配置： AWS CLI

**要使用导出探测器模型 AWS CLI**

1. 列出您账户中的所有探测器型号：

   ```
   aws iotevents list-detector-models
   ```

1. 对于每个探测器型号，通过运行以下命令导出其配置：

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. 保存每个探测器型号的输出。

------

## 步骤 2：创建 IAM 角色
<a name="eos-detector-model-create-iam-role"></a>

创建 IAM 角色以提供复制功能的权限 AWS IoT Events。本示例中的角色授予访问 DynamoDB 以进行状态管理 EventBridge、调度、Kinesis Data Streams 用于数据 AWS IoT Core 摄取、发布消息和记录的权限。 CloudWatch 这些服务共同起到替代作用 AWS IoT Events。

1. 创建具有以下权限的 IAM 角色。有关创建 IAM 角色的更多详细说明，请参阅 I [A *M 用户指南*中的创建角色以向 AWS 服务委派权限](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. 添加以下 IAM 角色信任策略。信任策略允许指定的 AWS 服务担任 IAM 角色，以便它们可以执行必要的操作。有关创建 IAM 信任策略的更多详细说明，请参阅 I *AM 用户指南*中的[使用自定义信任策略创建角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## 第 3 步：创建 Amazon Kinesis Data Streams
<a name="eos-detector-model-create-kinesis-stream"></a>

使用或创建 Amazon Kinesis Data Streams AWS 管理控制台 。 AWS CLI

------
#### [ Console ]

要使用创建 Kinesis 数据流 AWS 管理控制台，请按照《*Amazon Kinesis Data Streams 开发者指南》中创建数据*[流](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html)页面上的步骤进行操作。

根据您的设备数量和消息负载大小调整分片计数。

------
#### [ AWS CLI ]

使用 AWS CLI创建 Amazon Kinesis Data Streams，从您的设备中提取和分区数据。

在本次迁移中使用 Kinesis Data Streams 来取代的数据提取功能。 AWS IoT Events它提供了一种可扩展且高效的方法来收集、处理和分析来自物联网设备的实时流数据，同时还提供灵活的数据处理以及与其他 AWS 服务的集成。

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

根据您的设备数量和消息负载大小调整分片计数。

------

## 步骤 4：创建或更新 MQTT 消息路由规则
<a name="eos-detector-model-mqtt-rule"></a>

您可以创建新的 MQTT 消息路由规则或更新现有规则。

------
#### [ Console ]

1. 确定是否需要新的 MQTT 消息路由规则，或者是否可以更新现有规则。

1. 打开 [AWS IoT Core 控制台](https://console.aws.amazon.com/iot/)。

1. 在导航窗格中，选择 “**消息路由**”，然后选择 “**规则**”。

1. 在**管理**部分，选择**邮件路由**，然后选择**规则**。

1. 选择 **Create rule**（创建规则）。

1. 在**指定规则属性**页面上，输入 AWS IoT Core 规则名称的**规则名称**。在 “**规则描述-*可选*” 中，输入描述以标识您正在处理事件并将其转发到 Kinesis Data Stre** ams。

1. 在 “**配置 SQL 语句**” 页上，为 **SQL 语句**输入以下内容：**SELECT \$1 FROM 'your-database'**，然后选择**下一步**。

1. 在**附加规则操作**页面和**规则操作下，选择 k** **inesis**。

1. 为直播选择你的 Kinesis 直播。对于分区键，输入 **your-instance-id**。为 IAM 角色选择相应的角色，然后选择**添加规则操作**。

有关更多信息，请参阅[创建 AWS IoT 规则以将设备数据路由到其他服务](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html)。

------
#### [ AWS CLI ]

1. 使用以下内容创建 JSON 文件 。此 JSON 配置文件定义了一 AWS IoT Core 条规则，该规则使用实例 ID 作为分区键，从主题中选择所有消息并将其转发到指定的 Kinesis 流。

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. 使用创建 MQTT 主题规则。 AWS CLI此步骤使用使用`events_rule.json`文件中定义的配置创建 AWS IoT Core 主题规则。 AWS CLI 

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## 步骤 5：获取目标 MQTT 主题的终端节点
<a name="eos-detector-model-get-mqtt-endpoint"></a>

使用目标 MQTT 主题配置您的主题发布传出消息的位置，取代之前由 AWS IoT Events处理的功能。终端节点是您的 AWS 账户和地区所独有的。

------
#### [ Console ]

1. 打开 [AWS IoT Core 控制台](https://console.aws.amazon.com/iot/)。

1. 在左侧导航面板的 **Connect** 部分，选择**域配置**。

1. 选择 **IOT: data-ATS** 域配置以打开配置的详细信息页面。

1. 复制**域名**值。此值是终端节点。保存终端节点值，因为您将在以后的步骤中使用它。

------
#### [ AWS CLI ]

运行以下命令以获取用于发布账户传出消息的 AWS IoT Core 终端节点。

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## 第 6 步：创建亚马逊 DynamoDB 表
<a name="eos-detector-model-create-dynamodb-table"></a>

 Amazon DynamoDB 表取代了 AWS IoT Events的状态管理功能，为在新解决方案架构中保留和管理设备状态和探测器模型逻辑提供了一种可扩展且灵活的方式。

------
#### [ Console ]

创建 Amazon DynamoDB 表以保留探测器模型的状态。*有关更多信息，请参阅亚马逊 [DynamoDB 开发者指南中的在 DynamoDB 中创建表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)。*

使用以下内容获取表格的详细信息：
+ 在**表名中**，输入您选择的表名。
+ 对于**分区密钥**，请输入您自己的实例 ID。
+ 您可以使用 “**表格**” 设置的 “默认**” 设置**

------
#### [ AWS CLI ]

运行以下命令创建 DynamoDB 表。

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## 步骤 7：创建 AWS Lambda 函数（控制台）
<a name="eos-detector-model-create-lambda-function"></a>

 Lambda 函数用作核心处理引擎，取代了的探测器模型评估逻辑。 AWS IoT Events在示例中，我们与其他 AWS 服务集成，以根据您定义的规则处理传入数据、管理状态和触发操作。

使用运行时创建一个 Lambda 函数。NodeJS使用以下代码片段，替换硬编码的常量：

1. 打开 [AWS Lambda console](https://console.aws.amazon.com/lambda/)。

1. 选择**创建函数**。

1. 输入**函数名称的名称**。

1. **选择 **NodeJS** 22.x 作为运行时间。**

1. 在**更改默认执行角色**下拉列表中，选择**使用现有角色**，然后选择您在之前的步骤中创建的 IAM 角色。

1. 选择**创建函数**。

1. 替换硬编码常量后，粘贴以下代码片段。

1. 函数创建后，在 “**代码**” 选项卡下，粘贴以下代码示例，将**your-destination-endpoint**端点替换为自己的端点。

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## 第 8 步：添加 Amazon Kinesis Data Streams 触发器
<a name="eos-detector-model-add-kinesis-trigger"></a>

使用或将 Kinesis Data Streams 触发器添加到 Lambda 函数中。 AWS 管理控制台 AWS CLI

在您的 Lambda 函数中添加 Kinesis Data Streams 触发器可在您的数据摄取管道和处理逻辑之间建立连接，使其能够自动评估传入的物联网数据流并实时对事件做出反应，类似于处理输入的方式。 AWS IoT Events 

------
#### [ Console ]

有关更多信息，请参阅*AWS Lambda 开发人员*指南中的[创建事件源映射以调用 Lambda 函数](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping)。

使用以下内容了解事件源映射的详细信息：
+ 在**函数名称**中，输入中使用的 lambda 名称。[步骤 7：创建 AWS Lambda 函数（控制台）](#eos-detector-model-create-lambda-function)
+ 对于**消费者-可选**，请输入您的 Kinesis 直播的 ARN。
+ 对于**批处理大小**，输入 **10**。

------
#### [ AWS CLI ]

运行以下命令创建 Lambda 函数触发器。

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## 步骤 9：测试数据摄取和输出功能 ()AWS CLI
<a name="eos-detector-model-data-ingestion-and-output"></a>

根据您在探测器模型中定义的内容向 MQTT 主题发布有效负载。以下是 MQTT 主题的示例负载`your-topic-name`，用于测试实现。

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

您应该会看到一条发布到主题的 MQTT 消息，其中包含以下（或类似）内容：

```
{
    "state": "alarm detected, timer started"
}
```

# 中 AWS IoT SiteWise 警报的迁移程序 AWS IoT Events
<a name="eos-procedure-alarms"></a>

本节介绍替代解决方案，这些解决方案在您迁移后可提供类似的警报功能 AWS IoT Events。

对于使用 AWS IoT Events 警报的 AWS IoT SiteWise 属性，您可以使用警 CloudWatch报迁移到解决方案。这种方法提供了强大的监控功能，以及诸如异常检测 SLAs 和分组警报之类的既定功能和其他功能。

## 比较架构
<a name="eos-architecture-comparison-alarms"></a>

当前 AWS IoT SiteWise 属性的 AWS IoT Events 警报配置需要`AssetModelCompositeModels`在资产模型中创建，如*AWS IoT SiteWise 用户指南*中的[定义外部警报](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/define-external-alarms.html)中所述。 AWS IoT SiteWise对新解决方案的修改通常通过 AWS IoT Events 控制台进行管理。

新的解决方案利用警报提供 CloudWatch 警报管理。此方法使用 AWS IoT SiteWise 通知将属性数据点发布到 AWS IoT Core MQTT 主题，然后由 Lambda 函数进行处理。该功能将这些通知转换为 CloudWatch 指标，从而通过 CloudWatch的警报框架实现警报监控。


| 用途 | 解决方案 | 差异 | 
| --- | --- | --- | 
|  **数据源**-属性数据来自 AWS IoT SiteWise  |  AWS IoT SiteWise MQTT 通知  |  将直接的 IoT Events 集成替换为来自 AWS IoT SiteWise 属性的 MQTT 通知  | 
|  **数据处理**-转换属性数据  |  Lambda 函数  |  处理 AWS IoT SiteWise 属性通知并将其转换为 CloudWatch 指标  | 
|  **警报评估**-监控指标并触发警报  |  亚马逊 CloudWatch 警报  |  用 AWS IoT Events 警报取代 CloudWatch 警报，提供其他功能，例如异常检测  | 
|  **集成** — 连接 AWS IoT SiteWise  |  AWS IoT SiteWise 外部警报  |  可选功能，可将 CloudWatch 警报 AWS IoT SiteWise 作为外部警报导回去  | 

## 步骤 1：在资产属性上启用 MQTT 通知
<a name="eos-alarms-mqtt-asset-property"></a>

如果您使用 AWS IoT Events 集成的 AWS IoT SiteWise 警报，则可以为要监控的每个属性启用 MQTT 通知。

1. 按照[中的配置资产警报 AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/configure-alarms.html#configure-alarm-threshold-value-console)程序进行操作，直到完成**编辑**资产模型属性的步骤。

1. 对于要迁移的每个属性，将 **MQTT 通知状态**更改为 “**活动**”。  
![\[屏幕截图显示 MQTT 通知状态下拉列表在 AWS IoT SiteWise 控制台中的位置。\]](http://docs.aws.amazon.com/zh_cn/iotevents/latest/developerguide/images/events-eos-sw-asset-mqtt.png)

1. 记下每个修改后的警报属性的警报发布到的主题路径。

有关更多信息，请参阅以下文档资源：
+ 在《*AWS IoT SiteWise 用户指南*[》中了解 MQTT 主题中的资产属性](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/mqtt-topics.html)。
+ 《*AWS IoT 开发人员指南》*中的 [MQTT 主题](https://docs.aws.amazon.com/iot/latest/developerguide/topics.html)。

## 步骤 2：创建 AWS Lambda 函数
<a name="eos-alarms-lambda-function"></a>

创建一个 Lambda 函数，用于读取 MQTT 主题发布的 TQV 数组，并将单个值发布到。 CloudWatch我们将使用此 Lambda 函数作为在 AWS IoT Core 消息规则中触发的目标操作。

1. 打开 [AWS Lambda console](https://console.aws.amazon.com/lambda)。

1. 选择**创建函数**。

1. 输入**函数名称的名称**。

1. **选择 **NodeJS** 22.x 作为运行时间。**

1. 在**更改默认执行角色**下拉列表中，选择**使用现有角色**，然后选择您在之前的步骤中创建的 IAM 角色。
**注意**  
此过程假设您已经迁移了探测器模型。如果您没有 IAM 角色，请参阅[](eos-procedure-detector-models.md#eos-detector-model-create-iam-role)。

1. 选择**创建函数**。

1. 替换硬编码常量后，粘贴以下代码片段。

   ```
   import json
   import boto3
   from datetime import datetime
   
   # Initialize CloudWatch client
   cloudwatch = boto3.client('cloudwatch')
   
   def lambda_handler(message, context):
       try:
           # Parse the incoming IoT message
           # Extract relevant information
           asset_id = message['payload']['assetId']
           property_id = message['payload']['propertyId']
           
           # Process each value in the values array
           for value in message['payload']['values']:
               # Extract timestamp and value
               timestamp = datetime.fromtimestamp(value['timestamp']['timeInSeconds'])
               metric_value = value['value']['doubleValue']
               quality = value.get('quality', 'UNKNOWN')
               
               # Publish to CloudWatch
               response = cloudwatch.put_metric_data(
                   Namespace='IoTSiteWise/AssetMetrics',
                   MetricData=[
                       {
                           'MetricName': f'Property_your-property-id',
                           'Value': metric_value,
                           'Timestamp': timestamp,
                           'Dimensions': [
                               {
                                   'Name': 'AssetId',
                                   'Value': 'your-asset-id'
                               },
                               {
                                   'Name': 'Quality',
                                   'Value': quality
                               }
                           ]
                       }
                   ]
               )
               
           return {
               'statusCode': 200,
               'body': json.dumps('Successfully published metrics to CloudWatch')
           }
           
       except Exception as e:
           print(f'Error processing message: {str(e)}')
           return {
               'statusCode': 500,
               'body': json.dumps(f'Error: {str(e)}')
           }
   ```

## 步骤 3：创建 AWS IoT Core 邮件路由规则
<a name="eos-alarms-message-routing"></a>
+ 按照[教程：重新发布 MQTT 消息过程进行](https://docs.aws.amazon.com/iot/latest/developerguide/iot-repub-rule.html)操作，在出现提示时输入以下信息：

  1. 命名消息路由规则`SiteWiseToCloudwatchAlarms`。

  1. 对于查询，您可以使用以下内容：

     ```
     SELECT * FROM '$aws/sitewise/asset-models/your-asset-model-id/assets/your-asset-id/properties/your-property-id'
     ```

  1. 在**规则操作**中，选择 **Lambda** 操作以将生成的数据发送到 AWS IoT SiteWise 。 CloudWatch例如：  
![\[显示了 Lambda 函数的规则操作的屏幕截图。\]](http://docs.aws.amazon.com/zh_cn/iotevents/latest/developerguide/images/events-eos-lambda-rule-action.png)

## 步骤 4：查看 CloudWatch 指标
<a name="eos-alarms-metrics"></a>

当您将数据提取到之前选择的属性时 AWS IoT SiteWise，会将数据路由到我们在中[步骤 1：在资产属性上启用 MQTT 通知](#eos-alarms-mqtt-asset-property)创建的 Lambda 函数。[步骤 2：创建 AWS Lambda 函数](#eos-alarms-lambda-function)在此步骤中，您可以查看 Lambda 是否将您的指标发送到。 CloudWatch

1. 打开 [CloudWatch AWS 管理控制台](https://console.aws.amazon.com/cloudwatch/)。

1. 在左侧导航栏中，选择**指标**，然后选择**所有指标**。

1. 选择警报的 URL 将其打开。

1. 在 S **ourc** e 选项卡下， CloudWatch 输出类似于此示例。此源信息确认指标数据正在输入到 CloudWatch。

   ```
   {
       "view": "timeSeries",
       "stacked": false,
       "metrics": [
           [ "IoTSiteWise/AssetMetrics", "Property_your-property-id-hash", "Quality", "GOOD", "AssetId", "your-asset-id-hash", { "id": "m1" } ]
       ],
       "region": "your-region"
   }
   ```

## 步骤 5：创建 CloudWatch 警报
<a name="eos-create-cw-alarm"></a>

按照 *Amazon CloudWatch 用户指南*中的[基于静态阈值创建 CloudWatch警](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)报程序，为每个相关指标创建警报。

**注意**  
Amazon 中有许多警报配置选项。有关 CloudWatch 警报 CloudWatch 的更多信息，请参阅《[亚马逊* CloudWatch 用户指南》中的 “使用亚马逊 CloudWatch *警报](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)”。

## 步骤 6：（可选）将 CloudWatch 警报导入 AWS IoT SiteWise
<a name="eos-import-cw-alarm-sw"></a>

您可以将 CloudWatch 警报配置为 AWS IoT SiteWise 使用 CloudWatch 警报操作和 Lambda 将数据发送回去。这种集成使您能够在 Monito SiteWise r 门户中查看警报状态和属性。

1. 将外部警报配置为资产模型中的一个属性。有关更多信息，请参阅《*AWS IoT SiteWise 用户指南》 AWS IoT SiteWise*[中的定义外部警报](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/define-external-alarms.html)。

1. 创建一个 Lambda 函数，该函数使用*AWS IoT SiteWise 用户指南*中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)API 向其发送警报数据。 AWS IoT SiteWise

1. 设置 CloudWatch 警报操作，以便在警报状态发生变化时调用 Lambda 函数。有关更多信息，请参阅《*Amazon CloudWatch 用户指南》*中的 “[警报操作](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#alarms-and-actions.html)” 部分。