

Pemberitahuan akhir dukungan: Pada 20 Mei 2026, AWS akan mengakhiri dukungan untuk AWS IoT Events. Setelah 20 Mei 2026, Anda tidak akan lagi dapat mengakses AWS IoT Events konsol atau AWS IoT Events sumber daya. Untuk informasi selengkapnya, lihat [AWS IoT Events akhir dukungan](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html).

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Prosedur migrasi untuk model detektor di AWS IoT Events
<a name="eos-procedure-detector-models"></a>

Bagian ini menjelaskan solusi alternatif yang menghadirkan fungsionalitas model detektor serupa saat Anda AWS IoT Events bermigrasi.

Anda dapat memigrasikan konsumsi data melalui AWS IoT Core aturan ke kombinasi layanan lain. AWS Alih-alih menelan data melalui [BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html)API, data dapat dirutekan ke topik MQTT. AWS IoT Core 

Pendekatan migrasi ini memanfaatkan topik AWS IoT Core MQTT sebagai titik masuk untuk data IoT Anda, menggantikan input langsung ke. AWS IoT Events Topik MQTT dipilih karena beberapa alasan utama. Mereka menawarkan kompatibilitas yang luas dengan perangkat IoT karena penggunaan MQTT yang meluas di industri. Topik-topik ini dapat menangani volume pesan yang tinggi dari berbagai perangkat, memastikan skalabilitas. Mereka juga memberikan fleksibilitas dalam perutean dan pemfilteran pesan berdasarkan konten atau jenis perangkat. Selain itu, topik AWS IoT Core MQTT terintegrasi secara mulus dengan AWS layanan lain, memfasilitasi proses migrasi.

Data mengalir dari topik MQTT ke dalam arsitektur yang menggabungkan Amazon Kinesis Data Streams, AWS Lambda fungsi, tabel Amazon DynamoDB, dan jadwal Amazon. EventBridge Kombinasi layanan ini mereplikasi dan meningkatkan fungsionalitas yang sebelumnya disediakan oleh AWS IoT Events, menawarkan Anda lebih banyak fleksibilitas dan kontrol atas jalur pemrosesan data IoT Anda.

## Membandingkan arsitektur
<a name="eos-architecture-comparison-detector-model"></a>

 AWS IoT Events Arsitektur saat ini menyerap data melalui AWS IoT Core aturan dan `BatchPutMessage` API. Arsitektur ini digunakan AWS IoT Core untuk konsumsi data dan penerbitan acara, dengan pesan yang dirutekan melalui AWS IoT Events input ke model detektor yang menentukan logika status. Peran IAM mengelola izin yang diperlukan.

Solusi baru dipertahankan AWS IoT Core untuk konsumsi data (sekarang dengan topik MQTT input dan output khusus). Ini memperkenalkan Kinesis Data Streams untuk partisi data dan fungsi Lambda evaluator untuk logika keadaan. Status perangkat sekarang disimpan dalam tabel DynamoDB, dan peran IAM yang disempurnakan mengelola izin di seluruh layanan ini.


| Tujuan | Solusi | Perbedaan | 
| --- | --- | --- | 
|  **Penyerapan data** - Menerima data dari perangkat IoT  |  AWS IoT Core  |  Sekarang membutuhkan dua topik MQTT yang berbeda: satu untuk menelan data perangkat dan satu lagi untuk menerbitkan acara keluaran  | 
|  **Arah pesan** - Merutekan pesan masuk ke layanan yang sesuai  |  AWS IoT Core aturan perutean pesan  |  Mempertahankan fungsionalitas perutean yang sama tetapi sekarang mengarahkan pesan ke Kinesis Data Streams, bukan AWS IoT Events  | 
|  **Pemrosesan data** - Menangani dan mengatur aliran data yang masuk  |  Kinesis Data Streams  |  Mengganti fungsionalitas AWS IoT Events input, menyediakan konsumsi data dengan partisi ID perangkat untuk pemrosesan pesan  | 
|  **Evaluasi logika** — Memproses perubahan status dan memicu tindakan  |  Penilai Lambda  |  Mengganti model AWS IoT Events detektor, menyediakan evaluasi logika status yang dapat disesuaikan melalui kode alih-alih alur kerja visual  | 
|  **Manajemen negara** - Mempertahankan status perangkat  |  Tabel DynamoDB  |  Komponen baru yang menyediakan penyimpanan status perangkat yang persisten, menggantikan manajemen AWS IoT Events status internal  | 
|  **Keamanan** - Mengelola izin layanan  |  IAM Role  |  Izin yang diperbarui sekarang mencakup akses ke Kinesis Data Streams, DynamoDB, dan selain izin yang ada EventBridge AWS IoT Core   | 

## Langkah 1: (Opsional) konfigurasi model AWS IoT Events detektor ekspor
<a name="eos-detector-model-export-events-data"></a>

Sebelum membuat sumber daya baru, ekspor definisi model AWS IoT Events detektor Anda. Ini berisi logika pemrosesan acara Anda dan dapat berfungsi sebagai referensi historis untuk menerapkan solusi baru Anda.

------
#### [ Console ]

Dengan menggunakan AWS IoT Events Konsol Manajemen AWS, lakukan langkah-langkah berikut untuk mengekspor konfigurasi model detektor Anda:

**Untuk mengekspor model detektor menggunakan Konsol Manajemen AWS**

1. Masuk ke [Konsol AWS IoT Events](https://console.aws.amazon.com/iotevents/).

1. Di panel navigasi kiri, pilih **Model detektor**.

1. Pilih model detektor untuk diekspor.

1. Pilih **Ekspor**. Baca pesan informasi mengenai output dan kemudian pilih **Ekspor** lagi.

1. Ulangi proses untuk setiap model detektor yang ingin Anda ekspor.

File yang berisi output JSON dari model detektor Anda ditambahkan ke folder unduhan browser Anda. Anda dapat menyimpan setiap konfigurasi model detektor secara opsional untuk menyimpan data historis.

------
#### [ AWS CLI ]

Menggunakan AWS CLI, jalankan perintah berikut untuk mengekspor konfigurasi model detektor Anda:

**Untuk mengekspor model detektor menggunakan AWS CLI**

1. Daftar semua model detektor di akun Anda:

   ```
   aws iotevents list-detector-models
   ```

1. Untuk setiap model detektor, ekspor konfigurasinya dengan menjalankan:

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. Simpan output untuk setiap model detektor.

------

## Langkah 2: Buat peran IAM
<a name="eos-detector-model-create-iam-role"></a>

Buat peran IAM untuk memberikan izin untuk mereplikasi fungsionalitas. AWS IoT Events Peran dalam contoh ini memberikan akses ke DynamoDB untuk manajemen status, untuk penjadwalan EventBridge, Kinesis Data Streams untuk konsumsi data, untuk memublikasikan pesan, dan untuk pencatatan. AWS IoT Core CloudWatch Bersama-sama, layanan ini berfungsi sebagai pengganti AWS IoT Events.

1. Buat peran IAM dengan izin sebagai berikut. Untuk petunjuk selengkapnya tentang cara membuat peran IAM, lihat [Membuat peran untuk mendelegasikan izin ke AWS layanan di Panduan](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html) Pengguna *IAM*.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. Tambahkan kebijakan kepercayaan peran IAM berikut. Kebijakan kepercayaan memungkinkan AWS layanan yang ditentukan untuk mengambil peran IAM sehingga mereka dapat melakukan tindakan yang diperlukan. Untuk petunjuk selengkapnya tentang cara membuat kebijakan kepercayaan IAM, lihat [Membuat peran menggunakan kebijakan kepercayaan khusus](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html) di *Panduan Pengguna IAM*.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## Langkah 3: Buat Amazon Kinesis Data Streams
<a name="eos-detector-model-create-kinesis-stream"></a>

Buat Amazon Kinesis Data Streams Konsol Manajemen AWS menggunakan atau. AWS CLI

------
#### [ Console ]

Untuk membuat aliran data Kinesis menggunakan Konsol Manajemen AWS, ikuti prosedur yang ditemukan di halaman [Buat aliran data](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html) di Panduan Pengembang *Amazon Kinesis* Data Streams.

Sesuaikan jumlah pecahan berdasarkan jumlah perangkat dan ukuran payload pesan.

------
#### [ AWS CLI ]

Menggunakan AWS CLI, buat Amazon Kinesis Data Streams untuk menyerap dan mempartisi data dari perangkat Anda.

Kinesis Data Streams digunakan dalam migrasi ini untuk menggantikan fungsionalitas konsumsi data. AWS IoT Events Ini menyediakan cara yang terukur dan efisien untuk mengumpulkan, memproses, dan menganalisis data streaming waktu nyata dari perangkat IoT Anda, sambil menyediakan penanganan dan integrasi data yang fleksibel dengan AWS layanan lain.

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

Sesuaikan jumlah pecahan berdasarkan jumlah perangkat dan ukuran payload pesan.

------

## Langkah 4: Buat atau perbarui aturan perutean pesan MQTT
<a name="eos-detector-model-mqtt-rule"></a>

Anda dapat membuat aturan perutean pesan MQTT baru atau memperbarui aturan yang ada.

------
#### [ Console ]

1. Tentukan apakah Anda memerlukan aturan perutean pesan MQTT baru atau apakah Anda dapat memperbarui aturan yang ada.

1. Buka [konsol AWS IoT Core](https://console.aws.amazon.com/iot/).

1. **Di panel navigasi, pilih **Perutean Pesan**, lalu pilih Aturan.**

1. Di bagian **Kelola**, pilih **Perutean pesan**, lalu **Aturan**.

1. Pilih **Buat aturan**.

1. Pada halaman **Tentukan properti aturan**, masukkan nama AWS IoT Core aturan untuk **nama Aturan**. Untuk **Deskripsi Aturan - *opsional*, masukkan deskripsi untuk mengidentifikasi bahwa Anda sedang memproses peristiwa dan meneruskannya ke Kinesis Data Streams**.

1. **Pada halaman **pernyataan Configure SQL**, masukkan yang berikut untuk **pernyataan SQL**:**SELECT \$1 FROM 'your-database'**, lalu pilih Berikutnya.**

1. Pada halaman **Lampirkan tindakan aturan**, dan di bawah **Tindakan aturan**, pilih **kinesis**.

1. Pilih aliran Kinesis Anda untuk aliran. Untuk kunci partisi, masukkan **your-instance-id**. Pilih peran yang sesuai untuk peran IAM, lalu pilih **Tambahkan tindakan aturan**.

Untuk informasi selengkapnya, lihat [Membuat aturan AWS IoT untuk merutekan data perangkat ke layanan lain](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html).

------
#### [ AWS CLI ]

1. Buat file JSON dengan konten berikut. File konfigurasi JSON ini mendefinisikan AWS IoT Core aturan yang memilih semua pesan dari topik dan meneruskannya ke aliran Kinesis tertentu, menggunakan ID instance sebagai kunci partisi.

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. Buat aturan topik MQTT menggunakan. AWS CLI Langkah ini menggunakan aturan AWS CLI untuk membuat AWS IoT Core topik menggunakan konfigurasi yang ditentukan dalam `events_rule.json` file.

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## Langkah 5: Dapatkan titik akhir untuk topik MQTT tujuan
<a name="eos-detector-model-get-mqtt-endpoint"></a>

Gunakan topik MQTT tujuan untuk mengonfigurasi tempat topik Anda mempublikasikan pesan keluar, menggantikan fungsionalitas yang sebelumnya ditangani oleh. AWS IoT Events Titik akhir unik untuk AWS akun dan wilayah Anda.

------
#### [ Console ]

1. Buka [konsol AWS IoT Core](https://console.aws.amazon.com/iot/).

1. Di bagian **Connect** di panel navigasi kiri, pilih **Konfigurasi domain**.

1. Pilih konfigurasi domain **IoT: Data-ATS** untuk membuka halaman detail konfigurasi.

1. Salin nilai **nama Domain**. Nilai ini adalah titik akhir. Simpan nilai endpoint karena Anda akan membutuhkannya di langkah selanjutnya.

------
#### [ AWS CLI ]

Jalankan perintah berikut untuk mendapatkan AWS IoT Core titik akhir untuk menerbitkan pesan keluar untuk akun Anda.

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## Langkah 6: Buat tabel Amazon DynamoDB
<a name="eos-detector-model-create-dynamodb-table"></a>

 Tabel Amazon DynamoDB menggantikan fungsionalitas AWS IoT Events manajemen status, menyediakan cara yang skalabel dan fleksibel untuk mempertahankan dan mengelola status perangkat Anda dan logika model detektor dalam arsitektur solusi baru Anda.

------
#### [ Console ]

Buat tabel Amazon DynamoDB untuk mempertahankan status model detektor. Untuk informasi selengkapnya, lihat [Membuat tabel di DynamoDB di](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) Panduan Pengembang Amazon *DynamoDB*.

Gunakan yang berikut ini untuk detail tabel:
+ Untuk **nama Tabel**, masukkan nama tabel yang Anda pilih.
+ Untuk **kunci Partition**, masukkan ID instans Anda sendiri.
+ Anda dapat menggunakan **pengaturan Default** untuk **pengaturan Tabel**

------
#### [ AWS CLI ]

Jalankan perintah berikut untuk membuat tabel DynamoDB.

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## Langkah 7: Buat AWS Lambda fungsi (konsol)
<a name="eos-detector-model-create-lambda-function"></a>

 Fungsi Lambda berfungsi sebagai mesin pemrosesan inti, menggantikan logika evaluasi model detektor. AWS IoT Events Dalam contoh, kami mengintegrasikan dengan AWS layanan lain untuk menangani data yang masuk, mengelola status, dan memicu tindakan berdasarkan aturan yang Anda tetapkan.

Buat fungsi Lambda dengan NodeJS runtime. Gunakan cuplikan kode berikut, ganti konstanta hard-code:

1. Buka [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. Pilih **Buat fungsi**.

1. Masukkan nama untuk **nama Fungsi**.

1. **Pilih **NodeJS** 22.x sebagai Runtime.**

1. Di dropdown **Ubah peran eksekusi default**, pilih **Gunakan peran yang ada**, lalu pilih peran IAM yang Anda buat di langkah sebelumnya.

1. Pilih **Buat fungsi**.

1. Tempel cuplikan kode berikut setelah mengganti konstanta kode keras.

1. Setelah fungsi Anda dibuat, di bawah tab **Kode**, tempel contoh kode berikut, ganti **your-destination-endpoint** titik akhir dengan milik Anda sendiri.

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## Langkah 8: Tambahkan pemicu Amazon Kinesis Data Streams
<a name="eos-detector-model-add-kinesis-trigger"></a>

Tambahkan pemicu Kinesis Data Streams ke fungsi Lambda menggunakan atau. Konsol Manajemen AWS AWS CLI

Menambahkan pemicu Kinesis Data Streams ke fungsi Lambda Anda akan membuat koneksi antara pipeline konsumsi data dan logika pemrosesan Anda, memungkinkannya secara otomatis mengevaluasi aliran data IoT yang masuk dan bereaksi terhadap peristiwa secara real-time, mirip dengan cara memproses input. AWS IoT Events 

------
#### [ Console ]

*Untuk informasi selengkapnya, lihat [Membuat pemetaan sumber peristiwa untuk menjalankan fungsi Lambda](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping) di Panduan Pengembang.AWS Lambda *

Gunakan yang berikut ini untuk detail pemetaan sumber acara:
+ Untuk **nama Fungsi**, masukkan nama lambda yang digunakan di[Langkah 7: Buat AWS Lambda fungsi (konsol)](#eos-detector-model-create-lambda-function).
+ Untuk **Konsumen - opsional**, masukkan ARN untuk aliran Kinesis Anda.
+ Untuk **Ukuran batch**, masukkan **10**.

------
#### [ AWS CLI ]

Jalankan perintah berikut untuk membuat pemicu fungsi Lambda.

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## Langkah 9: Uji konsumsi data dan fungsionalitas keluaran ()AWS CLI
<a name="eos-detector-model-data-ingestion-and-output"></a>

Publikasikan payload ke topik MQTT berdasarkan apa yang Anda tentukan dalam model detektor Anda. Berikut ini adalah contoh payload ke topik MQTT `your-topic-name` untuk menguji implementasi.

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

Anda akan melihat pesan MQTT yang dipublikasikan ke topik dengan konten berikut (atau serupa):

```
{
    "state": "alarm detected, timer started"
}
```