

Hinweis zum Ende des Supports: Am 20. Mai 2026 AWS endet der Support für AWS IoT Events. Nach dem 20. Mai 2026 können Sie nicht mehr auf die AWS IoT Events Konsole oder AWS IoT Events die Ressourcen zugreifen. Weitere Informationen finden Sie unter [AWS IoT Events Ende des Supports](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html).

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Migrationsverfahren für Detektormodelle in AWS IoT Events
<a name="eos-procedure-detector-models"></a>

In diesem Abschnitt werden alternative Lösungen beschrieben, die ähnliche Funktionen des Detektormodells bieten, wenn Sie von wechseln AWS IoT Events.

Sie können die Datenaufnahme über AWS IoT Core Regeln auf eine Kombination anderer AWS Dienste migrieren. Anstatt die Daten über die [BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html)API aufzunehmen, können die Daten an das MQTT-Thema weitergeleitet werden. AWS IoT Core 

Dieser Migrationsansatz nutzt AWS IoT Core MQTT-Themen als Einstiegspunkt für Ihre IoT-Daten und ersetzt die direkte Eingabe zu. AWS IoT Events MQTT-Themen werden aus mehreren wichtigen Gründen ausgewählt. Sie bieten aufgrund der weit verbreiteten Verwendung von MQTT in der Branche eine breite Kompatibilität mit IoT-Geräten. Diese Themen können große Mengen an Nachrichten von zahlreichen Geräten verarbeiten und gewährleisten so die Skalierbarkeit. Sie bieten auch Flexibilität beim Routing und Filtern von Nachrichten nach Inhalt oder Gerätetyp. Darüber hinaus lassen sich AWS IoT Core MQTT-Themen nahtlos in andere AWS Dienste integrieren, was den Migrationsprozess erleichtert.

Daten fließen von MQTT-Themen in eine Architektur, die Amazon Kinesis Data Streams, eine AWS Lambda Funktion, eine Amazon DynamoDB-Tabelle und Amazon-Zeitpläne kombiniert. EventBridge Diese Kombination von Diensten repliziert und erweitert die Funktionen, die zuvor von bereitgestellt wurden AWS IoT Events, und bietet Ihnen mehr Flexibilität und Kontrolle über Ihre IoT-Datenverarbeitungspipeline.

## Architekturen im Vergleich
<a name="eos-architecture-comparison-detector-model"></a>

Die aktuelle AWS IoT Events Architektur nimmt Daten über eine AWS IoT Core Regel und die `BatchPutMessage` API auf. Diese Architektur wird AWS IoT Core für die Datenaufnahme und die Veröffentlichung von Ereignissen verwendet, wobei Nachrichten über AWS IoT Events Eingänge an Detektormodelle weitergeleitet werden, die die Zustandslogik definieren. Eine IAM-Rolle verwaltet die erforderlichen Berechtigungen.

Die neue Lösung kümmert sich AWS IoT Core um die Datenaufnahme (jetzt mit speziellen MQTT-Themen für Eingabe und Ausgabe). Es führt Kinesis Data Streams für die Datenpartitionierung und eine Evaluator-Lambda-Funktion für die Zustandslogik ein. Gerätestatus werden jetzt in einer DynamoDB-Tabelle gespeichert, und eine erweiterte IAM-Rolle verwaltet die Berechtigungen für diese Dienste.


| Zweck | Lösung | Unterschiede | 
| --- | --- | --- | 
|  **Datenaufnahme** — Empfängt Daten von IoT-Geräten  |  AWS IoT Core  |  Erfordert jetzt zwei unterschiedliche MQTT-Themen: eines für die Erfassung von Gerätedaten und eines für die Veröffentlichung von Ausgabeereignissen  | 
|  **Nachrichtenrichtung** — Leitet eingehende Nachrichten an entsprechende Dienste weiter  |  AWS IoT Core Regel für die Nachrichtenweiterleitung  |  Behält die gleiche Routing-Funktionalität bei, leitet aber Nachrichten jetzt an Kinesis Data Streams weiter, anstatt AWS IoT Events  | 
|  **Datenverarbeitung — Verarbeitet** und organisiert eingehende Datenströme  |  Kinesis Data Streams  |  Ersetzt die AWS IoT Events Eingabefunktionalität und ermöglicht die Datenaufnahme durch eine Geräte-ID-Partitionierung für die Nachrichtenverarbeitung  | 
|  **Logikauswertung** — Verarbeitet Statusänderungen und löst Aktionen aus  |  Evaluator Lambda  |  Ersetzt das AWS IoT Events Detektormodell und ermöglicht eine anpassbare Auswertung der Zustandslogik durch Code anstelle eines visuellen Workflows  | 
|  **Statusverwaltung** — Behält den Gerätestatus bei  |  DynamoDB-Tabelle  |  Neue Komponente, die die persistente Speicherung von Gerätestatus ermöglicht und die interne AWS IoT Events Statusverwaltung ersetzt  | 
|  **Sicherheit** — Verwaltet Dienstberechtigungen  |  IAM role (IAM-Rolle)  |  Die aktualisierten Berechtigungen umfassen jetzt zusätzlich zu den vorhandenen Berechtigungen den Zugriff auf Kinesis Data Streams und EventBridge DynamoDB AWS IoT Core   | 

## Schritt 1: (Optional) Exportieren AWS IoT Events Sie die Modellkonfigurationen der Detektoren
<a name="eos-detector-model-export-events-data"></a>

Bevor Sie neue Ressourcen erstellen, exportieren Sie Ihre AWS IoT Events Detektormodelldefinitionen. Diese enthalten Ihre Logik zur Ereignisverarbeitung und können als historische Referenz für die Implementierung Ihrer neuen Lösung dienen.

------
#### [ Console ]

Führen Sie mithilfe von die folgenden Schritte aus AWS IoT Events AWS-Managementkonsole, um Ihre Meldermodellkonfigurationen zu exportieren:

**Um Detektormodelle zu exportieren, verwenden Sie AWS-Managementkonsole**

1. Melden Sie sich an der [AWS IoT Events -Konsole](https://console.aws.amazon.com/iotevents/) an.

1. Wählen Sie im linken Navigationsbereich **Detector models (Detektormodelle)**.

1. Wählen Sie das zu exportierende Detektormodell aus.

1. Wählen Sie **Export** aus. Lesen Sie die Informationsmeldung zur Ausgabe und wählen Sie dann erneut **Exportieren**.

1. Wiederholen Sie den Vorgang für jedes Meldermodell, das Sie exportieren möchten.

Eine Datei, die eine JSON-Ausgabe Ihres Detektormodells enthält, wird dem Download-Ordner Ihres Browsers hinzugefügt. Sie können optional jede Konfiguration des Detektormodells speichern, um historische Daten beizubehalten.

------
#### [ AWS CLI ]

Führen Sie mit dem die folgenden Befehle aus AWS CLI, um Ihre Detektormodellkonfigurationen zu exportieren:

**Um Detektormodelle zu exportieren mit AWS CLI**

1. Alle Meldermodelle in Ihrem Konto auflisten:

   ```
   aws iotevents list-detector-models
   ```

1. Exportieren Sie für jedes Meldermodell dessen Konfiguration, indem Sie den folgenden Befehl ausführen:

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. Speichern Sie die Ausgabe für jedes Detektormodell.

------

## Schritt 2: Erstellen einer IAM-Rolle
<a name="eos-detector-model-create-iam-role"></a>

Erstellen Sie eine IAM-Rolle, um Berechtigungen zur Replikation der Funktionalität von bereitzustellen. AWS IoT Events Die Rolle in diesem Beispiel gewährt Zugriff auf DynamoDB für die Statusverwaltung, EventBridge für die Planung, Kinesis Data Streams für die Datenaufnahme, für die Veröffentlichung von Nachrichten und AWS IoT Core für die Protokollierung. CloudWatch Zusammen können diese Dienste als Ersatz für verwendet werden. AWS IoT Events

1. Erstellen Sie eine IAM-Rolle mit den folgenden Berechtigungen. Ausführlichere Anweisungen zum Erstellen einer IAM-Rolle finden [Sie unter Erstellen einer Rolle zum Delegieren von Berechtigungen für einen AWS Dienst](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html) im *IAM-Benutzerhandbuch*.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. Fügen Sie die folgende Vertrauensrichtlinie für IAM-Rollen hinzu. Eine Vertrauensrichtlinie ermöglicht es den angegebenen AWS Diensten, die IAM-Rolle zu übernehmen, sodass sie die erforderlichen Aktionen ausführen können. Ausführlichere Anweisungen zur Erstellung einer IAM-Vertrauensrichtlinie finden Sie im *IAM-Benutzerhandbuch* unter [Erstellen einer Rolle mithilfe benutzerdefinierter Vertrauensrichtlinien](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html).

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## Schritt 3: Amazon Kinesis Data Streams erstellen
<a name="eos-detector-model-create-kinesis-stream"></a>

Erstellen Sie Amazon Kinesis Data Streams mit dem AWS-Managementkonsole oder AWS CLI.

------
#### [ Console ]

Um einen Kinesis-Datenstream mit dem zu erstellen AWS-Managementkonsole, folgen Sie dem Verfahren auf der Seite [Einen Datenstream erstellen](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html) im *Amazon Kinesis Data Streams Developer Guide*.

Passen Sie die Anzahl der Shards an Ihre Geräteanzahl und die Größe der Nachrichtennutzlast an.

------
#### [ AWS CLI ]

Erstellen Sie mithilfe von Amazon Kinesis Data Streams AWS CLI, um die Daten von Ihren Geräten aufzunehmen und zu partitionieren.

Kinesis Data Streams werden bei dieser Migration verwendet, um die Datenaufnahmefunktion von zu ersetzen. AWS IoT Events Es bietet eine skalierbare und effiziente Methode zur Erfassung, Verarbeitung und Analyse von Echtzeit-Streaming-Daten von Ihren IoT-Geräten und bietet gleichzeitig eine flexible Datenverarbeitung und Integration mit anderen AWS Diensten.

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

Passen Sie die Anzahl der Shards an die Anzahl Ihrer Geräte und die Größe der Nachrichtennutzlast an.

------

## Schritt 4: Erstellen oder aktualisieren Sie die MQTT-Nachrichtenrouting-Regel
<a name="eos-detector-model-mqtt-rule"></a>

Sie können eine neue MQTT-Nachrichtenrouting-Regel erstellen oder eine bestehende Regel aktualisieren.

------
#### [ Console ]

1. Stellen Sie fest, ob Sie eine neue MQTT-Nachrichtenrouting-Regel benötigen oder ob Sie eine bestehende Regel aktualisieren können.

1. Öffnen Sie die [AWS IoT Core -Konsole](https://console.aws.amazon.com/iot/).

1. Wählen Sie im Navigationsbereich **Message Routing** und dann **Regeln** aus.

1. Wählen **Sie im Abschnitt Verwalten** die Option **Nachrichtenweiterleitung** und dann **Regeln** aus.

1. Wählen Sie **Regel erstellen** aus.

1. **Geben Sie auf der Seite Regeleigenschaften** angeben den AWS IoT Core Regelnamen für **Regelname** ein. **Geben Sie unter Regelbeschreibung — *optional* eine Beschreibung ein, um zu kennzeichnen, dass Sie Ereignisse verarbeiten und an Kinesis Data Streams weiterleiten**.

1. Geben Sie auf der Seite „**SQL-Anweisung konfigurieren**“ Folgendes für die **SQL-Anweisung** ein: **SELECT \$1 FROM 'your-database'** und wählen Sie dann **Weiter**.

1. Wählen Sie auf der Seite „**Regeln anhängen**“ und unter **Regelaktionen** die Option **Kinesis** aus.

1. Wählen Sie Ihren Kinesis-Stream für den Stream aus. Geben Sie als Partitionsschlüssel **your-instance-id** ein. Wählen Sie die entsprechende Rolle für die IAM-Rolle aus und klicken Sie dann auf **Regelaktion hinzufügen**.

Weitere Informationen finden Sie unter [AWS IoT-Regeln erstellen, um Gerätedaten an andere Dienste weiterzuleiten](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html).

------
#### [ AWS CLI ]

1. Erstellen Sie eine JSON Datei mit dem Namen und dem folgenden Inhalt. Diese JSON-Konfigurationsdatei definiert eine AWS IoT Core Regel, die alle Nachrichten aus einem Thema auswählt und sie an den angegebenen Kinesis-Stream weiterleitet, wobei die Instanz-ID als Partitionsschlüssel verwendet wird.

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. Erstellen Sie die MQTT-Themenregel mit dem. AWS CLI In diesem Schritt wird mithilfe der AWS CLI eine AWS IoT Core Themenregel unter Verwendung der in der `events_rule.json` Datei definierten Konfiguration erstellt.

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## Schritt 5: Ermitteln Sie den Endpunkt für das MQTT-Zielthema
<a name="eos-detector-model-get-mqtt-endpoint"></a>

Verwenden Sie das Ziel-MQTT-Thema, um zu konfigurieren, wo Ihre Themen ausgehende Nachrichten veröffentlichen, und ersetzen Sie damit die Funktionalität, die zuvor von verwendet wurde. AWS IoT Events Der Endpunkt ist für Ihr AWS Konto und Ihre Region einzigartig.

------
#### [ Console ]

1. Öffnen Sie die [AWS IoT Core -Konsole](https://console.aws.amazon.com/iot/).

1. Wählen **Sie im linken Navigationsbereich im Bereich Connect** die Option **Domain-Konfiguration** aus.

1. Wählen Sie die Domainkonfiguration **IoT:Data-ATS**, um die Detailseite der Konfiguration zu öffnen.

1. **Kopieren Sie den Wert für den Domainnamen.** Dieser Wert ist der Endpunkt. Speichern Sie den Endpunktwert, da Sie ihn in späteren Schritten benötigen werden.

------
#### [ AWS CLI ]

Führen Sie den folgenden Befehl aus, um den AWS IoT Core Endpunkt für die Veröffentlichung ausgehender Nachrichten für Ihr Konto zu ermitteln.

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## Schritt 6: Erstellen Sie eine Amazon DynamoDB-Tabelle
<a name="eos-detector-model-create-dynamodb-table"></a>

 Eine Amazon DynamoDB-Tabelle ersetzt die Statusverwaltungsfunktion von und bietet eine skalierbare und flexible Möglichkeit AWS IoT Events, den Status Ihrer Geräte und die Detektormodelllogik in Ihrer neuen Lösungsarchitektur beizubehalten und zu verwalten.

------
#### [ Console ]

Erstellen Sie eine Amazon DynamoDB-Tabelle, um den Status der Detektormodelle beizubehalten. Weitere Informationen finden Sie unter [Erstellen einer Tabelle in DynamoDB im *Amazon DynamoDB*](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) Developer Guide.

Verwenden Sie Folgendes für die Tabellendetails:
+ Geben Sie **unter Tabellenname** einen Tabellennamen Ihrer Wahl ein.
+ Geben Sie für **Partitionsschlüssel** Ihre eigene Instanz-ID ein.
+ Sie können die **Standardeinstellungen** für die **Tabelleneinstellungen** verwenden

------
#### [ AWS CLI ]

Führen Sie den folgenden Befehl aus, um eine DynamoDB-Tabelle zu erstellen.

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## Schritt 7: Erstellen Sie eine AWS Lambda Funktion (Konsole)
<a name="eos-detector-model-create-lambda-function"></a>

 Die Lambda-Funktion dient als zentrale Verarbeitungs-Engine und ersetzt die Bewertungslogik des Detektormodells von AWS IoT Events. In diesem Beispiel integrieren wir andere AWS Dienste, um eingehende Daten zu verarbeiten, den Status zu verwalten und Aktionen auf der Grundlage Ihrer definierten Regeln auszulösen.

Erstellen Sie eine Lambda-Funktion mit NodeJS Laufzeit. Verwenden Sie den folgenden Codeausschnitt, der die hartcodierten Konstanten ersetzt:

1. Öffnen Sie die [AWS Lambda console](https://console.aws.amazon.com/lambda/).

1. Wählen Sie **Funktion erstellen**.

1. **Geben Sie einen Namen für den Funktionsnamen ein.**

1. **Wählen Sie **NodeJS 2.x** als Runtime aus.**

1. Wählen Sie in der Dropdownliste **Standardausführungsrolle ändern** die Option **Bestehende Rolle verwenden** und wählen Sie dann die IAM-Rolle aus, die Sie in den vorherigen Schritten erstellt haben.

1. Wählen Sie **Funktion erstellen**.

1. Fügen Sie den folgenden Codeausschnitt ein, nachdem Sie die hartcodierten Konstanten ersetzt haben.

1. Fügen Sie nach der Erstellung Ihrer Funktion auf der Registerkarte **Code** das folgende Codebeispiel ein und ersetzen Sie den **your-destination-endpoint** Endpunkt durch Ihren eigenen.

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## Schritt 8: Einen Amazon Kinesis Data Streams Streams-Trigger hinzufügen
<a name="eos-detector-model-add-kinesis-trigger"></a>

Fügen Sie der Lambda-Funktion mithilfe von oder einen Kinesis Data Streams Streams-Trigger hinzu. AWS-Managementkonsole AWS CLI

Durch das Hinzufügen eines Kinesis Data Streams Streams-Triggers zu Ihrer Lambda-Funktion wird die Verbindung zwischen Ihrer Datenerfassungspipeline und Ihrer Verarbeitungslogik hergestellt, sodass eingehende IoT-Datenströme automatisch ausgewertet und in Echtzeit auf Ereignisse reagiert werden kann, ähnlich wie bei der Verarbeitung von Eingaben. AWS IoT Events 

------
#### [ Console ]

Weitere Informationen finden Sie unter [Erstellen einer Ereignisquellenzuordnung zum Aufrufen einer Lambda-Funktion](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping) im *AWS Lambda Entwicklerhandbuch*.

Einzelheiten zur Zuordnung der Ereignisquellen finden Sie wie folgt:
+ Geben Sie als **Funktionsname** den Lambda-Namen ein, der in [Schritt 7: Erstellen Sie eine AWS Lambda Funktion (Konsole)](#eos-detector-model-create-lambda-function) verwendet wird.
+ Geben Sie für **Consumer — optional** den ARN für Ihren Kinesis-Stream ein.
+ Geben Sie für **Batch size (Stapelgröße)** **10** ein.

------
#### [ AWS CLI ]

Führen Sie den folgenden Befehl aus, um den Lambda-Funktionstrigger zu erstellen.

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## Schritt 9: Testen Sie die Funktionen zur Datenaufnahme und -ausgabe ()AWS CLI
<a name="eos-detector-model-data-ingestion-and-output"></a>

Veröffentlichen Sie eine Nutzlast zum MQTT-Thema, die auf dem basiert, was Sie in Ihrem Detektormodell definiert haben. Im Folgenden finden Sie ein Beispiel für eine Payload zum MQTT-Thema `your-topic-name` zum Testen einer Implementierung.

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

Sie sollten eine MQTT-Nachricht sehen, die zu einem Thema veröffentlicht wurde und den folgenden (oder ähnlichen) Inhalt hat:

```
{
    "state": "alarm detected, timer started"
}
```