

支援終止通知：2026 年 5 月 20 日， AWS 將終止對 的支援 AWS IoT Events。2026 年 5 月 20 日之後，您將無法再存取 AWS IoT Events 主控台或 AWS IoT Events 資源。如需詳細資訊，請參閱[AWS IoT Events 終止支援](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 中的偵測器模型遷移程序 AWS IoT Events
<a name="eos-procedure-detector-models"></a>

本節說明替代解決方案，可在您遷移時提供類似的偵測器模型功能 AWS IoT Events。

您可以透過 AWS IoT Core 規則將資料擷取遷移至其他服務 AWS 的組合。資料可以路由到 AWS IoT Core MQTT 主題，而不是透過 [BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html) API 擷取資料。

此遷移方法利用 AWS IoT Core MQTT 主題做為 IoT 資料的進入點，取代 的直接輸入 AWS IoT Events。MQTT 主題的選擇有幾個主要原因。由於 MQTT 在業界的廣泛使用，它們提供了與 IoT 裝置的廣泛相容性。這些主題可以處理來自許多裝置的大量訊息，以確保可擴展性。它們也提供根據內容或裝置類型路由和篩選訊息的彈性。此外， AWS IoT Core MQTT 主題與其他 AWS 服務無縫整合，促進遷移程序。

資料從 MQTT 主題流向結合 Amazon Kinesis Data Streams、 AWS Lambda 函數、Amazon DynamoDB 資料表和 Amazon EventBridge 排程的架構。這種服務組合會複寫並增強 先前提供的功能 AWS IoT Events，讓您更靈活地控制 IoT 資料處理管道。

## 比較架構
<a name="eos-architecture-comparison-detector-model"></a>

目前的 AWS IoT Events 架構會透過 AWS IoT Core 規則和 `BatchPutMessage` API 擷取資料。此架構使用 AWS IoT Core 進行資料擷取和事件發佈，並將訊息透過 AWS IoT Events 輸入路由至定義狀態邏輯的偵測器模型。IAM 角色會管理必要的許可。

新的解決方案 AWS IoT Core 會維護資料擷取 （現在具有專用輸入和輸出 MQTT 主題）。它引入用於資料分割的 Kinesis Data Streams 和用於狀態邏輯的評估器 Lambda 函數。裝置狀態現在存放在 DynamoDB 資料表中，增強型 IAM 角色會管理這些服務的許可。


| 用途 | 解決方案 | 差異 | 
| --- | --- | --- | 
|  **資料擷取** – 從 IoT 裝置接收資料  |  AWS IoT Core  |  現在需要兩個不同的 MQTT 主題：一個用於擷取裝置資料，另一個用於發佈輸出事件  | 
|  **訊息方向** – 將傳入的訊息路由到適當的服務  |  AWS IoT Core 訊息路由規則  |  維持相同的路由功能，但現在會將訊息導向 Kinesis Data Streams，而不是 AWS IoT Events  | 
|  **資料處理** – 處理和組織傳入的資料串流  |  Kinesis Data Streams  |  取代 AWS IoT Events 輸入功能，使用裝置 ID 分割提供資料擷取以進行訊息處理  | 
|  **邏輯評估** – 處理狀態變更並觸發動作  |  評估器 Lambda  |  取代 AWS IoT Events 偵測器模型，透過程式碼而非視覺化工作流程提供可自訂的狀態邏輯評估  | 
|  **狀態管理** – 維護裝置狀態  |  DynamoDB 表  |  提供裝置狀態持久性儲存的新元件，取代內部 AWS IoT Events 狀態管理  | 
|  **安全性** – 管理服務許可  |  IAM 角色  |  已更新的許可現在除了現有 AWS IoT Core 許可之外，還包括對 Kinesis Data Streams、DynamoDB 和 EventBridge 的存取  | 

## 步驟 1：（選用） 匯出 AWS IoT Events 偵測器模型組態
<a name="eos-detector-model-export-events-data"></a>

建立新資源之前，請先匯出 AWS IoT Events 偵測器模型定義。這些包含您的事件處理邏輯，可以做為實作新解決方案的歷史參考。

------
#### [ Console ]

使用 匯出偵測器模型組態時 AWS IoT Events AWS 管理主控台，請執行下列步驟：

**使用 匯出偵測器模型 AWS 管理主控台**

1. 登入 [AWS IoT Events 主控台](https://console.aws.amazon.com/iotevents/)。

1. 在左側導覽窗格中，選擇 **Detector models (偵測器模型)**。

1. 選取要匯出的偵測器模型。

1. 選擇 **Export** (匯出)。閱讀有關輸出的資訊訊息，然後再次選擇**匯出**。

1. 針對您要匯出的每個偵測器模型重複此程序。

包含偵測器模型 JSON 輸出的檔案會新增至瀏覽器的下載資料夾。您可以選擇性地儲存每個偵測器模型組態，以保留歷史資料。

------
#### [ AWS CLI ]

使用 AWS CLI執行下列命令來匯出偵測器模型組態：

**使用 匯出偵測器模型 AWS CLI**

1. 列出您帳戶中的所有偵測器模型：

   ```
   aws iotevents list-detector-models
   ```

1. 針對每個偵測器模型，執行下列動作來匯出其組態：

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. 儲存每個偵測器模型的輸出。

------

## 步驟 2：建立 IAM 角色
<a name="eos-detector-model-create-iam-role"></a>

建立 IAM 角色以提供複寫 功能的許可 AWS IoT Events。此範例中的角色會授予 DynamoDB 的存取權以進行狀態管理、EventBridge 用於排程、Kinesis Data Streams 用於資料擷取、 AWS IoT Core 用於發佈訊息，以及 CloudWatch 用於記錄。這些服務共同用來取代 AWS IoT Events。

1. 建立具備下列許可的 IAM 角色。如需建立 IAM 角色的詳細說明，請參閱《*IAM 使用者指南*》中的[建立角色以將許可委派給 AWS 服務](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. 新增下列 IAM 角色信任政策。信任政策允許指定的 AWS 服務擔任 IAM 角色，以便他們可以執行必要的動作。如需建立 IAM 信任政策的詳細說明，請參閱《*IAM 使用者指南*》中的[使用自訂信任政策建立角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## 步驟 3：建立 Amazon Kinesis Data Streams
<a name="eos-detector-model-create-kinesis-stream"></a>

使用 AWS 管理主控台 或 建立 Amazon Kinesis Data Streams AWS CLI。

------
#### [ Console ]

若要使用 建立 Kinesis 資料串流 AWS 管理主控台，請遵循 *Amazon Kinesis Data Streams 開發人員指南*中的[建立資料串流](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html)頁面上的程序。

根據裝置計數和訊息承載大小調整碎片計數。

------
#### [ AWS CLI ]

使用 AWS CLI建立 Amazon Kinesis Data Streams，從您的裝置擷取和分割資料。

Kinesis Data Streams 用於此遷移，以取代 的資料擷取功能 AWS IoT Events。它提供可擴展且有效率的方式來從 IoT 裝置收集、處理和分析即時串流資料，同時提供靈活的資料處理和與其他 AWS 服務的整合。

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

根據裝置計數和訊息承載大小調整碎片計數。

------

## 步驟 4：建立或更新 MQTT 訊息路由規則
<a name="eos-detector-model-mqtt-rule"></a>

您可以建立新的 MQTT 訊息路由規則或更新現有的規則。

------
#### [ Console ]

1. 判斷您是否需要新的 MQTT 訊息路由規則，或是是否可以更新現有的規則。

1. 開啟 [AWS IoT Core 主控台](https://console.aws.amazon.com/iot/)。

1. 在導覽窗格中，選擇**訊息路由**，然後選擇**規則**。

1. 在**管理**區段中，選擇**訊息路由**，然後選擇**規則**。

1. 選擇**建立規則**。

1. 在**指定規則屬性**頁面上，輸入 AWS IoT Core 規則名稱的**規則名稱**。針對**規則描述 - *選用*，輸入描述以識別您正在處理事件並將其轉送至 Kinesis Data Streams。**

1. 在**設定 SQL 陳述**式頁面上，輸入 SQL **陳述式**的下列項目：**SELECT \$1 FROM 'your-database'**，然後選擇**下一步**。

1. 在**連接規則動作**頁面上的**規則動作**下，選擇 **kinesis**。

1. 選擇串流的 Kinesis 串流。針對分割區索引鍵，輸入 **your-instance-id**。選取 IAM 角色的適當角色，然後選擇**新增規則動作**。

如需詳細資訊，請參閱[建立 AWS IoT 規則以將裝置資料路由至其他 服務](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html)。

------
#### [ AWS CLI ]

1. 使用下列內容建立 JSON 檔案 。此 JSON 組態檔案會定義 AWS IoT Core 規則，從主題中選取所有訊息，並使用執行個體 ID 做為分割區索引鍵，將其轉送至指定的 Kinesis 串流。

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. 使用 建立 MQTT 主題規則 AWS CLI。此步驟使用 AWS CLI ，使用 `events_rule.json` 檔案中定義的組態來建立 AWS IoT Core 主題規則。

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## 步驟 5：取得目的地 MQTT 主題的端點
<a name="eos-detector-model-get-mqtt-endpoint"></a>

使用目的地 MQTT 主題來設定主題發佈傳出訊息的位置，取代先前處理的功能 AWS IoT Events。端點對 AWS 您的帳戶和區域是唯一的。

------
#### [ Console ]

1. 開啟 [AWS IoT Core 主控台](https://console.aws.amazon.com/iot/)。

1. 在左側導覽面板的**連線**區段中，選擇**網域組態**。

1. 選擇 **iot：Data-ATS** 網域組態，以開啟組態的詳細資訊頁面。

1. 複製**網域名稱**值。此值是端點。儲存端點值，因為您會在後續步驟中需要它。

------
#### [ AWS CLI ]

執行下列命令以取得 AWS IoT Core 端點，以發佈您帳戶的傳出訊息。

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## 步驟 6：建立 Amazon DynamoDB 資料表
<a name="eos-detector-model-create-dynamodb-table"></a>

 Amazon DynamoDB 資料表會取代 的狀態管理功能 AWS IoT Events，提供可擴展且彈性的方式，在新的解決方案架構中保留和管理裝置狀態和偵測器模型邏輯。

------
#### [ Console ]

建立 Amazon DynamoDB 資料表以保留偵測器模型的狀態。如需詳細資訊，請參閱《Amazon [ DynamoDB 開發人員指南》中的在 DynamoDB 中建立資料表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)。 * DynamoDB *

使用下列表格詳細資訊：
+ 針對**資料表名稱**，輸入您選擇的資料表名稱。
+ 針對**分割區索引鍵**，輸入您自己的執行個體 ID。
+ 您可以使用**資料表****設定的預設設定** 

------
#### [ AWS CLI ]

執行下列命令來建立 DynamoDB 資料表。

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## 步驟 7：建立 AWS Lambda 函數 （主控台）
<a name="eos-detector-model-create-lambda-function"></a>

 Lambda 函數做為核心處理引擎，取代 的偵測器模型評估邏輯 AWS IoT Events。在此範例中，我們與其他 AWS 服務整合，以根據您定義的規則處理傳入資料、管理狀態和觸發動作。

建立具有NodeJS執行時間的 Lambda 函數。使用下列程式碼片段，取代硬式編碼常數：

1. 開啟 [AWS Lambda console](https://console.aws.amazon.com/lambda/)。

1. 選擇**建立函數**。

1. 輸入**函數名稱的名稱**。

1. 選取 **NodeJS 22.x **作為**執行時間**。

1. 在**變更預設執行角色**下拉式清單中，選擇**使用現有角色**，然後選取您在先前步驟中建立的 IAM 角色。

1. 選擇**建立函數**。

1. 在取代硬式編碼常數後，貼上下列程式碼片段。

1. 函數建立之後，請在**程式碼**索引標籤下貼上下列程式碼範例，以您自己的程式碼取代**your-destination-endpoint**端點。

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## 步驟 8：新增 Amazon Kinesis Data Streams 觸發條件
<a name="eos-detector-model-add-kinesis-trigger"></a>

使用 AWS 管理主控台 或 將 Kinesis Data Streams 觸發新增至 Lambda 函數 AWS CLI。

將 Kinesis Data Streams 觸發條件新增至 Lambda 函數會建立資料擷取管道與處理邏輯之間的連線，讓它自動評估傳入的 IoT 資料串流，並即時對事件做出反應，類似於 AWS IoT Events 處理輸入的方式。

------
#### [ Console ]

如需詳細資訊，請參閱《 *AWS Lambda 開發人員指南*》中的[建立事件來源映射以叫用 Lambda 函數](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping)。

針對事件來源映射詳細資訊，請使用下列項目：
+ 針對**函數名稱**，輸入 中使用的 lambda 名稱[步驟 7：建立 AWS Lambda 函數 （主控台）](#eos-detector-model-create-lambda-function)。
+ 針對**消費者 - 選用**，輸入 Kinesis 串流的 ARN。
+ **批次大小**請輸入 **10**。

------
#### [ AWS CLI ]

執行下列命令來建立 Lambda 函數觸發。

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## 步驟 9：測試資料擷取和輸出功能 (AWS CLI)
<a name="eos-detector-model-data-ingestion-and-output"></a>

根據您在偵測器模型中定義的內容，將承載發佈至 MQTT 主題。以下是 MQTT 主題的範例承載，`your-topic-name`用於測試實作。

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

您應該會看到發佈至主題的 MQTT 訊息，其中包含下列 （或類似內容）：

```
{
    "state": "alarm detected, timer started"
}
```