

支援終止通知：2026 年 5 月 20 日， AWS 將終止對 的支援 AWS IoT Events。2026 年 5 月 20 日之後，您將無法再存取 AWS IoT Events 主控台或 AWS IoT Events 資源。如需詳細資訊，請參閱[AWS IoT Events 終止支援](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS IoT Events 終止支援
<a name="iotevents-end-of-support"></a>

在仔細考慮之後，我們決定終止 AWS IoT Events 對服務的支援，自 2026 年 5 月 20 日起。 AWS IoT Events 自 2025 年 5 月 20 日起將不再接受新客戶。身為在 2025 年 5 月 20 日之前註冊服務的現有客戶，您可以繼續使用 AWS IoT Events 功能。2026 年 5 月 20 日之後，您將無法再使用 AWS IoT Events。

此頁面提供客戶轉換至替代解決方案 AWS IoT Events 的說明和考量，以滿足您的業務需求。

**注意**  
這些指南中提供的解決方案旨在做為說明性範例，而不是做為 AWS IoT Events 功能的生產就緒替代。根據您的業務需求自訂程式碼、工作流程和相關 AWS 資源。

**Topics**
+ [從 遷移時的考量事項 AWS IoT Events](#eos-considerations)
+ [中的偵測器模型遷移程序 AWS IoT Events](eos-procedure-detector-models.md)
+ [中的 AWS IoT SiteWise 警示遷移程序 AWS IoT Events](eos-procedure-alarms.md)

## 從 遷移時的考量事項 AWS IoT Events
<a name="eos-considerations"></a>
+ 實作安全最佳實務，包括為每個元件使用具有最低權限的 IAM 角色，以及加密靜態和傳輸中的資料。如需詳細資訊，請參閱《IAM 使用者指南》**中的 [IAM 中的安全性最佳實務](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html)。
+ 根據您的資料擷取需求，考慮 Kinesis 串流的碎片數量。如需 Kinesis 碎片的詳細資訊，請參閱[《Amazon Kinesis Data Streams 開發人員指南》中的 Amazon Kinesis Data Streams 術語和概念](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html)。 *Amazon Kinesis *
+ 使用 CloudWatch 為指標和日誌設定全面的監控和偵錯。如需詳細資訊，請參閱《Amazon [ CloudWatch 使用者指南》中的什麼是](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) CloudWatch？。 *Amazon CloudWatch *
+ 考慮錯誤處理的結構，包括如何管理重複處理失敗的訊息、實作重試政策，以及設定程序來隔離和分析有問題的訊息。
+ 使用 [AWS 定價計算器](https://calculator.aws)來預估特定使用案例的成本。

# 中的偵測器模型遷移程序 AWS IoT Events
<a name="eos-procedure-detector-models"></a>

本節說明替代解決方案，可在您遷移時提供類似的偵測器模型功能 AWS IoT Events。

您可以透過 AWS IoT Core 規則將資料擷取遷移至其他服務 AWS 的組合。資料可以路由到 AWS IoT Core MQTT 主題，而不是透過 [BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html) API 擷取資料。

此遷移方法利用 AWS IoT Core MQTT 主題做為 IoT 資料的進入點，取代 的直接輸入 AWS IoT Events。MQTT 主題的選擇有幾個主要原因。由於 MQTT 在業界的廣泛使用，它們提供了與 IoT 裝置的廣泛相容性。這些主題可以處理來自許多裝置的大量訊息，以確保可擴展性。它們也提供根據內容或裝置類型路由和篩選訊息的彈性。此外， AWS IoT Core MQTT 主題與其他 AWS 服務無縫整合，促進遷移程序。

資料從 MQTT 主題流向結合 Amazon Kinesis Data Streams、 AWS Lambda 函數、Amazon DynamoDB 資料表和 Amazon EventBridge 排程的架構。這種服務組合會複寫並增強 先前提供的功能 AWS IoT Events，讓您更靈活地控制 IoT 資料處理管道。

## 比較架構
<a name="eos-architecture-comparison-detector-model"></a>

目前的 AWS IoT Events 架構會透過 AWS IoT Core 規則和 `BatchPutMessage` API 擷取資料。此架構使用 AWS IoT Core 進行資料擷取和事件發佈，並將訊息透過 AWS IoT Events 輸入路由至定義狀態邏輯的偵測器模型。IAM 角色會管理必要的許可。

新的解決方案 AWS IoT Core 會維護資料擷取 （現在具有專用輸入和輸出 MQTT 主題）。它引入用於資料分割的 Kinesis Data Streams 和用於狀態邏輯的評估器 Lambda 函數。裝置狀態現在存放在 DynamoDB 資料表中，增強型 IAM 角色會管理這些服務的許可。


| 用途 | 解決方案 | 差異 | 
| --- | --- | --- | 
|  **資料擷取** – 從 IoT 裝置接收資料  |  AWS IoT Core  |  現在需要兩個不同的 MQTT 主題：一個用於擷取裝置資料，另一個用於發佈輸出事件  | 
|  **訊息方向** – 將傳入的訊息路由到適當的服務  |  AWS IoT Core 訊息路由規則  |  維持相同的路由功能，但現在會將訊息導向 Kinesis Data Streams，而不是 AWS IoT Events  | 
|  **資料處理** – 處理和組織傳入的資料串流  |  Kinesis Data Streams  |  取代 AWS IoT Events 輸入功能，使用裝置 ID 分割提供資料擷取以進行訊息處理  | 
|  **邏輯評估** – 處理狀態變更並觸發動作  |  評估器 Lambda  |  取代 AWS IoT Events 偵測器模型，透過程式碼而非視覺化工作流程提供可自訂的狀態邏輯評估  | 
|  **狀態管理** – 維護裝置狀態  |  DynamoDB 表  |  提供裝置狀態持久性儲存的新元件，取代內部 AWS IoT Events 狀態管理  | 
|  **安全性** – 管理服務許可  |  IAM 角色  |  已更新的許可現在除了現有 AWS IoT Core 許可之外，還包括對 Kinesis Data Streams、DynamoDB 和 EventBridge 的存取  | 

## 步驟 1：（選用） 匯出 AWS IoT Events 偵測器模型組態
<a name="eos-detector-model-export-events-data"></a>

建立新資源之前，請先匯出 AWS IoT Events 偵測器模型定義。這些包含您的事件處理邏輯，可以做為實作新解決方案的歷史參考。

------
#### [ Console ]

使用 匯出偵測器模型組態時 AWS IoT Events AWS 管理主控台，請執行下列步驟：

**使用 匯出偵測器模型 AWS 管理主控台**

1. 登入 [AWS IoT Events 主控台](https://console.aws.amazon.com/iotevents/)。

1. 在左側導覽窗格中，選擇 **Detector models (偵測器模型)**。

1. 選取要匯出的偵測器模型。

1. 選擇 **Export** (匯出)。閱讀有關輸出的資訊訊息，然後再次選擇**匯出**。

1. 針對您要匯出的每個偵測器模型重複此程序。

包含偵測器模型 JSON 輸出的檔案會新增至瀏覽器的下載資料夾。您可以選擇性地儲存每個偵測器模型組態，以保留歷史資料。

------
#### [ AWS CLI ]

使用 AWS CLI執行下列命令來匯出偵測器模型組態：

**使用 匯出偵測器模型 AWS CLI**

1. 列出您帳戶中的所有偵測器模型：

   ```
   aws iotevents list-detector-models
   ```

1. 針對每個偵測器模型，執行下列動作來匯出其組態：

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. 儲存每個偵測器模型的輸出。

------

## 步驟 2：建立 IAM 角色
<a name="eos-detector-model-create-iam-role"></a>

建立 IAM 角色以提供複寫 功能的許可 AWS IoT Events。此範例中的角色會授予 DynamoDB 的存取權以進行狀態管理、EventBridge 用於排程、Kinesis Data Streams 用於資料擷取、 AWS IoT Core 用於發佈訊息，以及 CloudWatch 用於記錄。這些服務共同用來取代 AWS IoT Events。

1. 建立具備下列許可的 IAM 角色。如需建立 IAM 角色的詳細說明，請參閱《*IAM 使用者指南*》中的[建立角色以將許可委派給 AWS 服務](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. 新增下列 IAM 角色信任政策。信任政策允許指定的 AWS 服務擔任 IAM 角色，以便他們可以執行必要的動作。如需建立 IAM 信任政策的詳細說明，請參閱《*IAM 使用者指南*》中的[使用自訂信任政策建立角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## 步驟 3：建立 Amazon Kinesis Data Streams
<a name="eos-detector-model-create-kinesis-stream"></a>

使用 AWS 管理主控台 或 建立 Amazon Kinesis Data Streams AWS CLI。

------
#### [ Console ]

若要使用 建立 Kinesis 資料串流 AWS 管理主控台，請遵循 *Amazon Kinesis Data Streams 開發人員指南*中的[建立資料串流](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html)頁面上的程序。

根據裝置計數和訊息承載大小調整碎片計數。

------
#### [ AWS CLI ]

使用 AWS CLI建立 Amazon Kinesis Data Streams，從您的裝置擷取和分割資料。

Kinesis Data Streams 用於此遷移，以取代 的資料擷取功能 AWS IoT Events。它提供可擴展且有效率的方式來從 IoT 裝置收集、處理和分析即時串流資料，同時提供靈活的資料處理和與其他 AWS 服務的整合。

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

根據裝置計數和訊息承載大小調整碎片計數。

------

## 步驟 4：建立或更新 MQTT 訊息路由規則
<a name="eos-detector-model-mqtt-rule"></a>

您可以建立新的 MQTT 訊息路由規則或更新現有的規則。

------
#### [ Console ]

1. 判斷您是否需要新的 MQTT 訊息路由規則，或是是否可以更新現有的規則。

1. 開啟 [AWS IoT Core 主控台](https://console.aws.amazon.com/iot/)。

1. 在導覽窗格中，選擇**訊息路由**，然後選擇**規則**。

1. 在**管理**區段中，選擇**訊息路由**，然後選擇**規則**。

1. 選擇**建立規則**。

1. 在**指定規則屬性**頁面上，輸入 AWS IoT Core 規則名稱的**規則名稱**。針對**規則描述 - *選用*，輸入描述以識別您正在處理事件並將其轉送至 Kinesis Data Streams。**

1. 在**設定 SQL 陳述**式頁面上，輸入 SQL **陳述式**的下列項目：**SELECT \$1 FROM 'your-database'**，然後選擇**下一步**。

1. 在**連接規則動作**頁面上的**規則動作**下，選擇 **kinesis**。

1. 選擇串流的 Kinesis 串流。針對分割區索引鍵，輸入 **your-instance-id**。選取 IAM 角色的適當角色，然後選擇**新增規則動作**。

如需詳細資訊，請參閱[建立 AWS IoT 規則以將裝置資料路由至其他 服務](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html)。

------
#### [ AWS CLI ]

1. 使用下列內容建立 JSON 檔案 。此 JSON 組態檔案會定義 AWS IoT Core 規則，從主題中選取所有訊息，並使用執行個體 ID 做為分割區索引鍵，將其轉送至指定的 Kinesis 串流。

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. 使用 建立 MQTT 主題規則 AWS CLI。此步驟使用 AWS CLI ，使用 `events_rule.json` 檔案中定義的組態來建立 AWS IoT Core 主題規則。

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## 步驟 5：取得目的地 MQTT 主題的端點
<a name="eos-detector-model-get-mqtt-endpoint"></a>

使用目的地 MQTT 主題來設定主題發佈傳出訊息的位置，取代先前處理的功能 AWS IoT Events。端點對 AWS 您的帳戶和區域是唯一的。

------
#### [ Console ]

1. 開啟 [AWS IoT Core 主控台](https://console.aws.amazon.com/iot/)。

1. 在左側導覽面板的**連線**區段中，選擇**網域組態**。

1. 選擇 **iot：Data-ATS** 網域組態，以開啟組態的詳細資訊頁面。

1. 複製**網域名稱**值。此值是端點。儲存端點值，因為您會在後續步驟中需要它。

------
#### [ AWS CLI ]

執行下列命令以取得 AWS IoT Core 端點，以發佈您帳戶的傳出訊息。

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## 步驟 6：建立 Amazon DynamoDB 資料表
<a name="eos-detector-model-create-dynamodb-table"></a>

 Amazon DynamoDB 資料表會取代 的狀態管理功能 AWS IoT Events，提供可擴展且彈性的方式，在新的解決方案架構中保留和管理裝置狀態和偵測器模型邏輯。

------
#### [ Console ]

建立 Amazon DynamoDB 資料表以保留偵測器模型的狀態。如需詳細資訊，請參閱《Amazon [ DynamoDB 開發人員指南》中的在 DynamoDB 中建立資料表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)。 * DynamoDB *

使用下列表格詳細資訊：
+ 針對**資料表名稱**，輸入您選擇的資料表名稱。
+ 針對**分割區索引鍵**，輸入您自己的執行個體 ID。
+ 您可以使用**資料表****設定的預設設定** 

------
#### [ AWS CLI ]

執行下列命令來建立 DynamoDB 資料表。

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## 步驟 7：建立 AWS Lambda 函數 （主控台）
<a name="eos-detector-model-create-lambda-function"></a>

 Lambda 函數做為核心處理引擎，取代 的偵測器模型評估邏輯 AWS IoT Events。在此範例中，我們與其他 AWS 服務整合，以根據您定義的規則處理傳入資料、管理狀態和觸發動作。

建立具有NodeJS執行時間的 Lambda 函數。使用下列程式碼片段，取代硬式編碼常數：

1. 開啟 [AWS Lambda console](https://console.aws.amazon.com/lambda/)。

1. 選擇**建立函數**。

1. 輸入**函數名稱的名稱**。

1. 選取 **NodeJS 22.x **作為**執行時間**。

1. 在**變更預設執行角色**下拉式清單中，選擇**使用現有角色**，然後選取您在先前步驟中建立的 IAM 角色。

1. 選擇**建立函數**。

1. 在取代硬式編碼常數後，貼上下列程式碼片段。

1. 函數建立之後，請在**程式碼**索引標籤下貼上下列程式碼範例，以您自己的程式碼取代**your-destination-endpoint**端點。

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## 步驟 8：新增 Amazon Kinesis Data Streams 觸發條件
<a name="eos-detector-model-add-kinesis-trigger"></a>

使用 AWS 管理主控台 或 將 Kinesis Data Streams 觸發新增至 Lambda 函數 AWS CLI。

將 Kinesis Data Streams 觸發條件新增至 Lambda 函數會建立資料擷取管道與處理邏輯之間的連線，讓它自動評估傳入的 IoT 資料串流，並即時對事件做出反應，類似於 AWS IoT Events 處理輸入的方式。

------
#### [ Console ]

如需詳細資訊，請參閱《 *AWS Lambda 開發人員指南*》中的[建立事件來源映射以叫用 Lambda 函數](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping)。

針對事件來源映射詳細資訊，請使用下列項目：
+ 針對**函數名稱**，輸入 中使用的 lambda 名稱[步驟 7：建立 AWS Lambda 函數 （主控台）](#eos-detector-model-create-lambda-function)。
+ 針對**消費者 - 選用**，輸入 Kinesis 串流的 ARN。
+ **批次大小**請輸入 **10**。

------
#### [ AWS CLI ]

執行下列命令來建立 Lambda 函數觸發。

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## 步驟 9：測試資料擷取和輸出功能 (AWS CLI)
<a name="eos-detector-model-data-ingestion-and-output"></a>

根據您在偵測器模型中定義的內容，將承載發佈至 MQTT 主題。以下是 MQTT 主題的範例承載，`your-topic-name`用於測試實作。

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

您應該會看到發佈至主題的 MQTT 訊息，其中包含下列 （或類似內容）：

```
{
    "state": "alarm detected, timer started"
}
```

# 中的 AWS IoT SiteWise 警示遷移程序 AWS IoT Events
<a name="eos-procedure-alarms"></a>

本節說明替代解決方案，可在您遷移時提供類似的警示功能 AWS IoT Events。

對於使用 AWS IoT Events 警示的 AWS IoT SiteWise 屬性，您可以使用 CloudWatch 警示遷移至解決方案。此方法透過已建立的 SLAs 和其他功能提供強大的監控功能，例如異常偵測和分組警示。

## 比較架構
<a name="eos-architecture-comparison-alarms"></a>

屬性目前的 AWS IoT Events 警示組態 AWS IoT SiteWise 需要在資產模型`AssetModelCompositeModels`中建立 ，如*AWS IoT SiteWise 《 使用者指南*》中的在 [中定義外部警示 AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/define-external-alarms.html)中所述。對新解決方案的修改通常透過 主控台進行 AWS IoT Events 管理。

新解決方案利用 CloudWatch 警示提供警示管理。此方法使用 AWS IoT SiteWise 通知將屬性資料點發佈至 AWS IoT Core MQTT 主題，然後由 Lambda 函數處理。函數會將這些通知轉換為 CloudWatch 指標，透過 CloudWatch 的警示架構啟用警示監控。


| 用途 | 解決方案 | 差異 | 
| --- | --- | --- | 
|  **資料來源** – 來自 的屬性資料 AWS IoT SiteWise  |  AWS IoT SiteWise MQTT 通知  |  使用 AWS IoT SiteWise 屬性的 MQTT 通知取代直接 IoT Events 整合  | 
|  **資料處理** – 轉換屬性資料  |  Lambda 函式  |  處理 AWS IoT SiteWise 屬性通知並將其轉換為 CloudWatch 指標  | 
|  **警示評估** – 監控指標並觸發警示  |  Amazon CloudWatch 警示  |  以 CloudWatch AWS IoT Events 警示取代警示，提供異常偵測等其他功能  | 
|  **整合** – 與 的連線 AWS IoT SiteWise  |  AWS IoT SiteWise 外部警示  |  將 CloudWatch 警示匯入回 AWS IoT SiteWise 做為外部警示的選用功能  | 

## 步驟 1：在資產屬性上啟用 MQTT 通知
<a name="eos-alarms-mqtt-asset-property"></a>

如果您使用 AWS IoT SiteWise 警示的 AWS IoT Events 整合，您可以為要監控的每個屬性開啟 MQTT 通知。

1. 遵循[程序中的資產設定警示 AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/configure-alarms.html#configure-alarm-threshold-value-console)，直到您完成**編輯**資產模型屬性的每個步驟。

1. 對於每個要遷移的屬性，請將 **MQTT 通知狀態**變更為 **ACTIVE**。  
![\[螢幕擷取畫面，顯示 AWS IoT SiteWise 主控台中 MQTT 通知狀態下拉式清單的位置。\]](http://docs.aws.amazon.com/zh_tw/iotevents/latest/developerguide/images/events-eos-sw-asset-mqtt.png)

1. 請注意警示針對每個修改的警示屬性發佈的主題路徑。

如需詳細資訊，請參閱下列文件資源：
+ *AWS IoT SiteWise 《 使用者指南*[》中的了解 MQTT 主題中的資產屬性](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/mqtt-topics.html)。
+ 《 *AWS IoT 開發人員指南*》中的 [MQTT 主題](https://docs.aws.amazon.com/iot/latest/developerguide/topics.html)。

## 步驟 2：建立 AWS Lambda 函數
<a name="eos-alarms-lambda-function"></a>

建立 Lambda 函數以讀取 MQTT 主題發佈的 TQV 陣列，並將個別值發佈至 CloudWatch。我們將使用此 Lambda 函數作為在 AWS IoT Core 訊息規則中觸發的目的地動作。

1. 開啟 [AWS Lambda console](https://console.aws.amazon.com/lambda)。

1. 選擇**建立函數**。

1. 輸入**函數名稱的名稱**。

1. 選取 **NodeJS 22.x **做為**執行時間**。

1. 在**變更預設執行角色**下拉式清單中，選擇**使用現有角色**，然後選取您在先前步驟中建立的 IAM 角色。
**注意**  
此程序假設您已遷移偵測器模型。如果您沒有 IAM 角色，請參閱 [](eos-procedure-detector-models.md#eos-detector-model-create-iam-role)。

1. 選擇**建立函數**。

1. 在取代硬式編碼常數後，貼上下列程式碼片段。

   ```
   import json
   import boto3
   from datetime import datetime
   
   # Initialize CloudWatch client
   cloudwatch = boto3.client('cloudwatch')
   
   def lambda_handler(message, context):
       try:
           # Parse the incoming IoT message
           # Extract relevant information
           asset_id = message['payload']['assetId']
           property_id = message['payload']['propertyId']
           
           # Process each value in the values array
           for value in message['payload']['values']:
               # Extract timestamp and value
               timestamp = datetime.fromtimestamp(value['timestamp']['timeInSeconds'])
               metric_value = value['value']['doubleValue']
               quality = value.get('quality', 'UNKNOWN')
               
               # Publish to CloudWatch
               response = cloudwatch.put_metric_data(
                   Namespace='IoTSiteWise/AssetMetrics',
                   MetricData=[
                       {
                           'MetricName': f'Property_your-property-id',
                           'Value': metric_value,
                           'Timestamp': timestamp,
                           'Dimensions': [
                               {
                                   'Name': 'AssetId',
                                   'Value': 'your-asset-id'
                               },
                               {
                                   'Name': 'Quality',
                                   'Value': quality
                               }
                           ]
                       }
                   ]
               )
               
           return {
               'statusCode': 200,
               'body': json.dumps('Successfully published metrics to CloudWatch')
           }
           
       except Exception as e:
           print(f'Error processing message: {str(e)}')
           return {
               'statusCode': 500,
               'body': json.dumps(f'Error: {str(e)}')
           }
   ```

## 步驟 3：建立 AWS IoT Core 訊息路由規則
<a name="eos-alarms-message-routing"></a>
+ 遵循[教學課程：重新發佈 MQTT 訊息程序，](https://docs.aws.amazon.com/iot/latest/developerguide/iot-repub-rule.html)在出現提示時輸入下列資訊：

  1. 命名訊息路由規則 `SiteWiseToCloudwatchAlarms`。

  1. 對於查詢，您可以使用下列項目：

     ```
     SELECT * FROM '$aws/sitewise/asset-models/your-asset-model-id/assets/your-asset-id/properties/your-property-id'
     ```

  1. 在**規則動作**中，選取 **Lambda** 動作，將產生的資料 AWS IoT SiteWise 從 傳送至 CloudWatch。例如：  
![\[螢幕擷取畫面，顯示 Lambda 函數的規則動作。\]](http://docs.aws.amazon.com/zh_tw/iotevents/latest/developerguide/images/events-eos-lambda-rule-action.png)

## 步驟 4：檢視 CloudWatch 指標
<a name="eos-alarms-metrics"></a>

當您將資料擷取至 中稍早選取的 AWS IoT SiteWise屬性[步驟 1：在資產屬性上啟用 MQTT 通知](#eos-alarms-mqtt-asset-property)時， 會將資料路由至我們在 中建立的 Lambda 函數[步驟 2：建立 AWS Lambda 函數](#eos-alarms-lambda-function)。在此步驟中，您可以檢查 以查看將指標傳送至 CloudWatch 的 Lambda。

1. 開啟 [CloudWatch AWS 管理主控台](https://console.aws.amazon.com/cloudwatch/)。

1. 在左側導覽中，選擇**指標**，然後選擇**所有指標**。

1. 選擇警示的 URL 來開啟它。

1. 在**來源**索引標籤下，CloudWatch 輸出看起來與此範例類似。此來源資訊會確認指標資料正在饋送至 CloudWatch。

   ```
   {
       "view": "timeSeries",
       "stacked": false,
       "metrics": [
           [ "IoTSiteWise/AssetMetrics", "Property_your-property-id-hash", "Quality", "GOOD", "AssetId", "your-asset-id-hash", { "id": "m1" } ]
       ],
       "region": "your-region"
   }
   ```

## 步驟 5：建立 CloudWatch 警示
<a name="eos-create-cw-alarm"></a>

遵循《Amazon [ CloudWatch 使用者指南》中的根據靜態閾值程序建立](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html) CloudWatch 警示，為每個相關指標建立警示。 *Amazon CloudWatch * 

**注意**  
Amazon CloudWatch 中有許多警示組態選項。如需 CloudWatch 警示的詳細資訊，請參閱[Amazon CloudWatch 使用者指南》中的使用 Amazon CloudWatch 警示](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)。 *Amazon CloudWatch *

## 步驟 6：（選用） 將 CloudWatch 警示匯入至 AWS IoT SiteWise
<a name="eos-import-cw-alarm-sw"></a>

您可以設定 CloudWatch 警示， AWS IoT SiteWise 使用 CloudWatch 警示動作和 Lambda 將資料傳回至 。此整合可讓您在 SiteWise Monitor 入口網站中檢視警示狀態和屬性。

1. 將外部警示設定為資產模型中的屬性。如需詳細資訊，請參閱*AWS IoT SiteWise 《 使用者指南》*中的在 [中定義外部警示 AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/define-external-alarms.html)。

1. 建立 Lambda 函數，使用*AWS IoT SiteWise 《 使用者指南*》中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) API 將警示資料傳送至 AWS IoT SiteWise。

1. 設定 CloudWatch 警示動作，以在警示狀態變更時叫用 Lambda 函數。如需詳細資訊，請參閱《*Amazon CloudWatch 使用者指南*》中的[警示動作](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#alarms-and-actions.html)一節。