

End of support notice: On May 20, 2026, AWS will end support for AWS IoT Events. After May 20, 2026, you will no longer be able to access the AWS IoT Events console or AWS IoT Events resources. For more information, see [AWS IoT Events end of support](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html).

# AWS IoT Events end of support
<a name="iotevents-end-of-support"></a>

After careful consideration, we decided to end support for the AWS IoT Events service, effective May 20, 2026. AWS IoT Events will no longer accept new customers beginning May 20, 2025. As an existing customer with an account signed up for the service before May 20, 2025, you can continue to use AWS IoT Events features. After May 20, 2026, you will no longer be able to use AWS IoT Events.

This page provides instructions and considerations for AWS IoT Events customers to transition to an alternate solution to meet your business needs.

**Note**  
The solutions presented in these guides are meant to serve as an illustrative examples, not as a production-ready replacements for AWS IoT Events functionality. Customize the code, workflow, and related AWS resources to your business needs.

**Topics**
+ [Considerations when migrating away from AWS IoT Events](#eos-considerations)
+ [Migration procedure for detector models in AWS IoT Events](eos-procedure-detector-models.md)
+ [Migration procedure for AWS IoT SiteWise alarms in AWS IoT Events](eos-procedure-alarms.md)

## Considerations when migrating away from AWS IoT Events
<a name="eos-considerations"></a>
+ Implement security best practices, including using IAM roles with least privilege for each component and encrypting data at rest and in transit. For more information, see [Security best practices in IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html) in the *IAM User Guide*.
+ Consider the number of shards for the Kinesis stream based on your data ingestion requirements. For more information on Kinesis shards, see [Amazon Kinesis Data Streams terminology and concepts](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html) in the *Amazon Kinesis Data Streams Developer Guide*.
+ Set up comprehensive monitoring and debugging using CloudWatch for metrics and logs. For more information, see [What is CloudWatch?](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) in the *Amazon CloudWatch User Guide*.
+ Consider the structure of your error handling, including how to manage messages that fail processing repeatedly, implementing retry policies, and setting up a process to isolate and analyze problematic messages.
+ Use the [AWS Pricing Calculator](https://calculator.aws) to estimate costs for your specific use case.

# Migration procedure for detector models in AWS IoT Events
<a name="eos-procedure-detector-models"></a>

This section describes alternative solutions that deliver similar detector model functionality as you migrate away from AWS IoT Events.

You can migrate data ingestion through AWS IoT Core rules to a combination of other AWS services. Instead of data ingestion through the [BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html) API, the data can be routed to the AWS IoT Core MQTT topic.

This migration approach leverages AWS IoT Core MQTT topics as the entry point for your IoT data, replacing the direct input to AWS IoT Events. MQTT topics are chosen for several key reasons. They offer broad compatibility with IoT devices due to MQTT's widespread use in the industry. These topics can handle high volumes of messages from numerous devices, ensuring scalability. They also provide flexibility in routing and filtering messages based on content or device type. Additionally, AWS IoT Core MQTT topics integrate seamlessly with other AWS services, facilitating the migration process.

Data flows from MQTT topics into an architecture combining Amazon Kinesis Data Streams, a AWS Lambda function, a Amazon DynamoDB table, and Amazon EventBridge schedules. This combination of services replicates and enhances the functionality previously provided by AWS IoT Events, offering you more flexibility and control over your IoT data processing pipeline.

## Comparing architectures
<a name="eos-architecture-comparison-detector-model"></a>

The current AWS IoT Events architecture ingests data through an AWS IoT Core rule and the `BatchPutMessage` API. This architecture uses AWS IoT Core for data ingestion and event publishing, with messages routed through AWS IoT Events inputs to detector models that define the state logic. An IAM role manages the necessary permissions.

The new solution maintains AWS IoT Core for data ingestion (now with dedicated input and output MQTT topics). It introduces Kinesis Data Streams for data partitioning and an evaluator Lambda function for state logic. Device states are now stored in a DynamoDB table, and an enhanced IAM role manages permissions across these services.


| Purpose | Solution | Differences | 
| --- | --- | --- | 
|  **Data ingestion** – Receives data from IoT devices  |  AWS IoT Core  |  Now requires two distinct MQTT topics: one for ingesting device data and another for publishing output events  | 
|  **Message direction** – Routes incoming messages to appropriate services  |  AWS IoT Core message routing rule  |  Maintains same routing functionality but now directs messages to Kinesis Data Streams instead of AWS IoT Events  | 
|  **Data processing** – Handles and organizes incoming data streams  |  Kinesis Data Streams  |  Replaces AWS IoT Events input functionality, providing data ingestion with device ID partitioning for message processing  | 
|  **Logic evaluation** – Processes state changes and triggers actions  |  Evaluator Lambda  |  Replaces AWS IoT Events detector model, providing customizable state logic evaluation through code instead of visual workflow  | 
|  **State management** – Maintains device states  |  DynamoDB table  |  New component that provides persistent storage of device states, replacing internal AWS IoT Events state management  | 
|  **Security** – Manages service permissions  |  IAM role  |  Updated permissions now include access to Kinesis Data Streams, DynamoDB, and EventBridge in addition to existing AWS IoT Core permissions  | 

## Step 1: (Optional) export AWS IoT Events detector model configurations
<a name="eos-detector-model-export-events-data"></a>

Before creating new resources, export your AWS IoT Events detector model definitions. These contain your event processing logic and can serve as a historical reference for implementing your new solution.

------
#### [ Console ]

Using the AWS IoT Events AWS Management Console, perform the following steps to export your detector model configurations:

**To export detector models using the AWS Management Console**

1. Log into the [AWS IoT Events console ](https://console.aws.amazon.com/iotevents/).

1. In the left navigation pane, choose **Detector models**.

1. Select the detector model to export.

1. Choose **Export**. Read the information message regarding the output and then choose **Export** again.

1. Repeat the process for each detector model that you want to export.

A file containing a JSON output of your detector model is added to your browser's download folder. You can optionally save each detector model configuration to preserve historical data.

------
#### [ AWS CLI ]

Using the AWS CLI, run the following commands to export your detector model configurations:

**To export detector models using AWS CLI**

1. List all detector models in your account:

   ```
   aws iotevents list-detector-models
   ```

1. For each detector model, export its configuration by running:

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. Save the output for each detector model.

------

## Step 2: Create an IAM role
<a name="eos-detector-model-create-iam-role"></a>

Create an IAM role to provide permissions to replicate the functionality of AWS IoT Events. The role in this example grants access to DynamoDB for state management, EventBridge for scheduling, Kinesis Data Streams for data ingestion, AWS IoT Core for publishing messages, and CloudWatch for logging. Together, these services to work as a replacement for AWS IoT Events.

1. Create an IAM role with the following permissions. For more detailed instructions on creating an IAM role, see [Create a role to delegate permissions to an AWS service](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html) in the *IAM User Guide*.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. Add the following IAM role trust policy. A trust policy allows the specified AWS services to assume the IAM role so that they can to perform necessary actions. For more detailed instructions on creating an IAM trust policy, see [Create a role using custom trust policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html) in the *IAM User Guide*.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## Step 3: Create Amazon Kinesis Data Streams
<a name="eos-detector-model-create-kinesis-stream"></a>

Create Amazon Kinesis Data Streams using the AWS Management Console or AWS CLI.

------
#### [ Console ]

To create a Kinesis data stream using the AWS Management Console, follow the procedure found on the [Create a data stream](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html) page in the *Amazon Kinesis Data Streams Developer Guide*.

Adjust the shard count based on your device count and message payload size.

------
#### [ AWS CLI ]

Using AWS CLI, create Amazon Kinesis Data Streams to ingest and partition the data from your devices.

Kinesis Data Streams are used in this migration to replace the data ingestion functionality of AWS IoT Events. It provides a scalable and efficient way to collect, process, and analyze real-time streaming data from your IoT devices, while providing flexible data handling and integration with other AWS services.

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

Adjust the shard count based on your device count and message payload size.

------

## Step 4: Create or update the MQTT message routing rule
<a name="eos-detector-model-mqtt-rule"></a>

You can create a new MQTT message routing rule or update an existing rule.

------
#### [ Console ]

1. Determine if you need a new MQTT message routing rule or if you can update an existing rule.

1. Open the [AWS IoT Core console](https://console.aws.amazon.com/iot/).

1. In the navigation pane, choose **Message Routing**, and then choose **Rules**.

1. In the **Manage** section, choose **Message routing**, and then **Rules**.

1. Choose **Create rule**.

1. On the **Specify rule properties** page, enter the AWS IoT Core rule name for **Rule name**. For **Rule Description - *optional*, enter a description to identify that you're processing events and forwarding them to Kinesis Data Streams.**

1. On the **Configure SQL statement** page, enter the following for the **SQL statement**: **SELECT \$1 FROM 'your-database'**, then choose **Next**.

1. On the **Attach rules actions** page, and under **Rule actions**, choose **kinesis**.

1. Choose your Kinesis stream for the stream. For the partition key, enter **your-instance-id**. Select the appropriate role for the IAM role, and then choose **Add rule action**.

For more information, see [Creating AWS IoT rules to route device data to other services](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html).

------
#### [ AWS CLI ]

1. Create a JSON file with the following contents. This JSON configuration file defines an AWS IoT Core rule that selects all messages from a topic and forwards them to the specified Kinesis stream, using the instance ID as the partition key.

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. Create the MQTT topic rule using the AWS CLI. This step uses the AWS CLI to create an AWS IoT Core topic rule using the configuration defined in the `events_rule.json` file.

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## Step 5: Get the endpoint for the destination MQTT topic
<a name="eos-detector-model-get-mqtt-endpoint"></a>

Use the destination MQTT topic to configure where your topics publish outgoing messages, replacing the functionality previously handled by AWS IoT Events. The endpoint is unique to your AWS account and region.

------
#### [ Console ]

1. Open the [AWS IoT Core console](https://console.aws.amazon.com/iot/).

1. In the **Connect** section on the left navigation panel, choose **Domain configuration**.

1. Choose the **iot:Data-ATS** domain configuration to open the configuration's detail page.

1. Copy the **Domain name** value. This value is the endpoint. Save the endpoint value because you'll need it in later steps.

------
#### [ AWS CLI ]

Run the following command to get the AWS IoT Core endpoint for publishing outgoing messages for your account.

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## Step 6: Create an Amazon DynamoDB table
<a name="eos-detector-model-create-dynamodb-table"></a>

 A Amazon DynamoDB table replaces the state management functionality of AWS IoT Events, providing a scalable and flexible way to persist and manage the state of your devices and the detector model logic in your new solution architecture.

------
#### [ Console ]

Create a Amazon DynamoDB table to persist the state of the detector models. For more information, see [Create a table in DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) in the *Amazon DynamoDB Developer Guide*.

Use the following for the table details:
+ For **Table name**, enter a table name of your choosing.
+ For **Partition key**, enter your own instance ID.
+ You can use the **Default settings** for the **Table settings**

------
#### [ AWS CLI ]

Run the following command to create a DynamoDB table.

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## Step 7: Create an AWS Lambda function (console)
<a name="eos-detector-model-create-lambda-function"></a>

 The Lambda function serves as the core processing engine, replacing the detector model evaluation logic of AWS IoT Events. In the example, we integrate with other AWS services to handle incoming data, manage state, and trigger actions based on your defined rules.

Create a Lambda function with NodeJS runtime. Use the following code snippet, replacing the hard-coded constants:

1. Open the [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. Choose **Create function**.

1. Enter a name for the **Function name**.

1. Select **NodeJS 22.x** as the **Runtime**.

1. In the **Change default execution role** dropdown, choose **Use existing role**, and then select the IAM role that you created in earlier steps.

1. Choose **Create function**.

1. Paste in the following code snippet after replacing the hard coded constants.

1. After your function creates, under the **Code** tab, paste the following code example, replacing the **your-destination-endpoint** endpoint with your own.

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## Step 8: Add an Amazon Kinesis Data Streams trigger
<a name="eos-detector-model-add-kinesis-trigger"></a>

Add a Kinesis Data Streams trigger to the Lambda function using the AWS Management Console or AWS CLI.

Adding a Kinesis Data Streams trigger to your Lambda function establishes the connection between your data ingestion pipeline and your processing logic, letting it automatically evaluate incoming IoT data streams and react to events in real-time, similar to how AWS IoT Events processes inputs.

------
#### [ Console ]

For more information, see [Create an event source mapping to invoke a Lambda function](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping) in the *AWS Lambda Developer Guide*.

Use the following for the event source mapping details:
+ For **Function name**, enter the lambda name used in [Step 7: Create an AWS Lambda function (console)](#eos-detector-model-create-lambda-function).
+ For **Consumer - optional**, enter the ARN for the your Kinesis stream.
+ For **Batch size**, enter **10**.

------
#### [ AWS CLI ]

Run the following command to create the Lambda function trigger.

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## Step 9: Test data ingestion and output functionality (AWS CLI)
<a name="eos-detector-model-data-ingestion-and-output"></a>

Publish a payload to the MQTT topic based on what you defined in your detector model. The following is an example payload to the MQTT topic `your-topic-name` to test an implementation.

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

You should see an MQTT message published to a topic with the following (or similar) content:

```
{
    "state": "alarm detected, timer started"
}
```

# Migration procedure for AWS IoT SiteWise alarms in AWS IoT Events
<a name="eos-procedure-alarms"></a>

This section describes alternative solutions that deliver similar alarm functionality as you migrate away from AWS IoT Events.

For AWS IoT SiteWise properties that use AWS IoT Events alarms, you can migrate to a solution using CloudWatch alarms. This approach provides robust monitoring capabilities with established SLAs and additional features like anomaly detection and grouped alarms.

## Comparing architectures
<a name="eos-architecture-comparison-alarms"></a>

The current AWS IoT Events alarm configuration for AWS IoT SiteWise properties requires creating `AssetModelCompositeModels` in the asset model, as described in [Define external alarms in AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/define-external-alarms.html) in the *AWS IoT SiteWise User Guide*. Modifications to the new solution are typically managed through the AWS IoT Events console.

The new solution provides alarm management by leveraging CloudWatch alarms. This approach uses AWS IoT SiteWise notifications to publish property data points to AWS IoT Core MQTT topics, which are then processed by a Lambda function. The function transforms these notifications into CloudWatch metrics, enabling alarm monitoring through CloudWatch's alarming framework.


| Purpose | Solution | Differences | 
| --- | --- | --- | 
|  **Data source** – Property data from AWS IoT SiteWise  |  AWS IoT SiteWise MQTT notifications  |  Replaces direct IoT Events integration with MQTT notifications from AWS IoT SiteWise properties  | 
|  **Data processing** – Transforms property data  |  Lambda function  |  Processes AWS IoT SiteWise property notifications and converts them to CloudWatch metrics  | 
|  **Alarm evaluation** – Monitors metrics and triggers alarms  |  Amazon CloudWatch alarms  |  Replaces AWS IoT Events alarms with CloudWatch alarms, offering additional features like anomaly detection  | 
|  **Integration** – Connection with AWS IoT SiteWise  |  AWS IoT SiteWise external alarms  |  Optional capability to import CloudWatch alarms back into AWS IoT SiteWise as external alarms  | 

## Step 1: Enable MQTT notifications on the asset property
<a name="eos-alarms-mqtt-asset-property"></a>

If you are using AWS IoT Events integrations for AWS IoT SiteWise alarms, you can turn on MQTT notifications for each property to monitor.

1. Follow the [Configure alarms on assets in AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/configure-alarms.html#configure-alarm-threshold-value-console) procedure until you each the step to **Edit** the asset model's properties.

1. For each property to migrate, change the **MQTT Notification status** to **ACTIVE**.  
![\[A screenshot showing the placement of the MQTT notification status dropdown in the AWS IoT SiteWise console.\]](http://docs.aws.amazon.com/iotevents/latest/developerguide/images/events-eos-sw-asset-mqtt.png)

1. Note the topic path to which the alarm publishes for each modified alarm attribute.

For more information, see the following documentation resources:
+ [Understand asset properties in MQTT topics](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/mqtt-topics.html) in the *AWS IoT SiteWise User Guide*.
+ [MQTT topics](https://docs.aws.amazon.com/iot/latest/developerguide/topics.html) in the *AWS IoT Developer Guide*.

## Step 2: Create an AWS Lambda function
<a name="eos-alarms-lambda-function"></a>

Create an Lambda function for reading the TQV array published by the MQTT topic and publish individual values to CloudWatch. We’ll use this Lambda function as a destination action to trigger in AWS IoT Core Message Rules.

1. Open the [AWS Lambda console](https://console.aws.amazon.com/lambda).

1. Choose **Create function**.

1. Enter a name for the **Function name**.

1. Select **NodeJS 22.x** as the **Runtime**.

1. In the **Change default execution role** dropdown, choose **Use existing role**, and then select the IAM role that you created in earlier steps.
**Note**  
This procedure assumes that you've already migrated your detector model. If you don't have an IAM role, see [](eos-procedure-detector-models.md#eos-detector-model-create-iam-role).

1. Choose **Create function**.

1. Paste in the following code snippet after replacing the hard coded constants.

   ```
   import json
   import boto3
   from datetime import datetime
   
   # Initialize CloudWatch client
   cloudwatch = boto3.client('cloudwatch')
   
   def lambda_handler(message, context):
       try:
           # Parse the incoming IoT message
           # Extract relevant information
           asset_id = message['payload']['assetId']
           property_id = message['payload']['propertyId']
           
           # Process each value in the values array
           for value in message['payload']['values']:
               # Extract timestamp and value
               timestamp = datetime.fromtimestamp(value['timestamp']['timeInSeconds'])
               metric_value = value['value']['doubleValue']
               quality = value.get('quality', 'UNKNOWN')
               
               # Publish to CloudWatch
               response = cloudwatch.put_metric_data(
                   Namespace='IoTSiteWise/AssetMetrics',
                   MetricData=[
                       {
                           'MetricName': f'Property_your-property-id',
                           'Value': metric_value,
                           'Timestamp': timestamp,
                           'Dimensions': [
                               {
                                   'Name': 'AssetId',
                                   'Value': 'your-asset-id'
                               },
                               {
                                   'Name': 'Quality',
                                   'Value': quality
                               }
                           ]
                       }
                   ]
               )
               
           return {
               'statusCode': 200,
               'body': json.dumps('Successfully published metrics to CloudWatch')
           }
           
       except Exception as e:
           print(f'Error processing message: {str(e)}')
           return {
               'statusCode': 500,
               'body': json.dumps(f'Error: {str(e)}')
           }
   ```

## Step 3: Create AWS IoT Core message routing rule
<a name="eos-alarms-message-routing"></a>
+ Follow the [Tutorial: Republishing an MQTT message](https://docs.aws.amazon.com/iot/latest/developerguide/iot-repub-rule.html) procedure entering the following information when prompted:

  1. Name message routing rule `SiteWiseToCloudwatchAlarms`.

  1. For the query, you can use the following:

     ```
     SELECT * FROM '$aws/sitewise/asset-models/your-asset-model-id/assets/your-asset-id/properties/your-property-id'
     ```

  1. In **Rule actions**, select the **Lambda** action to send the data generated from AWS IoT SiteWise to CloudWatch. For example:  
![\[A screenshot showing the rule action for the Lambda function.\]](http://docs.aws.amazon.com/iotevents/latest/developerguide/images/events-eos-lambda-rule-action.png)

## Step 4: View CloudWatch metrics
<a name="eos-alarms-metrics"></a>

As you ingest data to AWS IoT SiteWise, the property selected earlier in [Step 1: Enable MQTT notifications on the asset property](#eos-alarms-mqtt-asset-property), the routes data to the Lambda function we created in [Step 2: Create an AWS Lambda function](#eos-alarms-lambda-function). In this step, you can check to see the Lambda sending your metrics to CloudWatch.

1. Open the [CloudWatch AWS Management Console](https://console.aws.amazon.com/cloudwatch/).

1. In the left navigation, choose **Metrics**, then **All metrics**.

1. Choose an alarm's URL to open it.

1. Under the **Source** tab, the CloudWatch output looks similar to this example. This source information confirms that the metric data is feeding into CloudWatch.

   ```
   {
       "view": "timeSeries",
       "stacked": false,
       "metrics": [
           [ "IoTSiteWise/AssetMetrics", "Property_your-property-id-hash", "Quality", "GOOD", "AssetId", "your-asset-id-hash", { "id": "m1" } ]
       ],
       "region": "your-region"
   }
   ```

## Step 5: Create CloudWatch alarms
<a name="eos-create-cw-alarm"></a>

Follow the [Create a CloudWatch alarm based on a static threshold](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html) procedure in the *Amazon CloudWatch User Guide* to create alarms for each relevant metric.

**Note**  
There are many options for alarm configuration in Amazon CloudWatch For more information on CloudWatch alarms, see [Using Amazon CloudWatch alarms](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html) in the *Amazon CloudWatch User Guide*.

## Step 6: (Optional) import the CloudWatch alarm into AWS IoT SiteWise
<a name="eos-import-cw-alarm-sw"></a>

You can configure CloudWatch alarms to send data back to AWS IoT SiteWise using CloudWatch alarm actions and Lambda. This integration enables you to view alarm states and properties in the SiteWise Monitor portal.

1. Configure the external alarm as a property in an asset model. For more information, see [Define external alarms in AWS IoT SiteWise](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/define-external-alarms.html) in the *AWS IoT SiteWise User Guide*.

1. Create a Lambda function that uses the [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) API found in the *AWS IoT SiteWise User Guide* to send alarm data to AWS IoT SiteWise.

1. Set up CloudWatch alarm actions to invoke your Lambda function when alarm states change. For more information, see the [Alarm actions](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#alarms-and-actions.html) section in the *Amazon CloudWatch User Guide*.