

サポート終了通知: 2026 年 5 月 20 日、 AWS は のサポートを終了します AWS IoT Events。2026 年 5 月 20 日以降、 AWS IoT Events コンソールまたは AWS IoT Events リソースにアクセスできなくなります。詳細については、[AWS IoT Events 「サポート終了](https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-end-of-support.html)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# でのディテクターモデルの移行手順 AWS IoT Events
<a name="eos-procedure-detector-models"></a>

このセクションでは、移行時に同様のディテクターモデル機能を提供する代替ソリューションについて説明します AWS IoT Events。

 AWS IoT Core ルールによるデータインジェストを他の AWS サービスの組み合わせに移行できます。[BatchPutMessage](https://docs.aws.amazon.com/iotevents/latest/apireference/API_iotevents-data_BatchPutMessage.html) API によるデータ取り込みの代わりに、データを AWS IoT Core MQTT トピックにルーティングできます。

この移行アプローチでは、 AWS IoT Core MQTT トピックを IoT データのエントリポイントとして活用し、 への直接入力を置き換えます AWS IoT Events。MQTT トピックは、いくつかの主な理由で選択されます。MQTT は業界で広く使用されているため、IoT デバイスとの幅広い互換性があります。これらのトピックでは、多数のデバイスからの大量のメッセージを処理できるため、スケーラビリティが確保されます。また、コンテンツまたはデバイスタイプに基づいてメッセージのルーティングとフィルタリングを柔軟に行うことができます。さらに、 AWS IoT Core MQTT トピックは他の AWS サービスとシームレスに統合されるため、移行プロセスが容易になります。

MQTT トピックから Amazon Kinesis Data Streams、 AWS Lambda 関数、Amazon DynamoDB テーブル、Amazon EventBridge スケジュールを組み合わせたアーキテクチャにデータが流れます。このサービスの組み合わせにより、 によって以前に提供された機能がレプリケートおよび強化され AWS IoT Events、IoT データ処理パイプラインをより柔軟に制御できます。

## アーキテクチャの比較
<a name="eos-architecture-comparison-detector-model"></a>

現在の AWS IoT Events アーキテクチャは、 AWS IoT Core ルールと `BatchPutMessage` API を通じてデータを取り込みます。このアーキテクチャでは、データインジェストとイベント発行 AWS IoT Core に を使用し、状態ロジックを定義するディテクターモデルへの AWS IoT Events 入力を介してメッセージがルーティングされます。IAM ロールは、必要なアクセス許可を管理します。

新しいソリューションでは、データインジェスト AWS IoT Core 用に が維持されます (現在、専用の入出力 MQTT トピックがあります）。データパーティショニング用の Kinesis Data Streams と、状態ロジック用の評価者 Lambda 関数が導入されました。デバイスの状態が DynamoDB テーブルに保存され、拡張 IAM ロールがこれらのサービス全体のアクセス許可を管理するようになりました。


| 目的 | ソリューション | 相違点 | 
| --- | --- | --- | 
|  **データ取り込み** — IoT デバイスからデータを受信します  |  AWS IoT Core  |  2 つの異なる MQTT トピックが必要になりました。1 つはデバイスデータの取り込み用、もう 1 つは出力イベントの発行用です。  | 
|  **メッセージの方向** – 受信メッセージを適切なサービスにルーティングします  |  AWS IoT Core メッセージルーティングルール  |  同じルーティング機能を維持しますが、 ではなく Kinesis Data Streams にメッセージを送信するようになりました。 AWS IoT Events  | 
|  **データ処理** – 受信データストリームを処理および整理します。  |  Kinesis Data Streams  |   AWS IoT Events 入力機能を置き換え、データ取り込みをメッセージ処理用のデバイス ID パーティショニングに置き換えます  | 
|  **ロジック評価** – 状態の変更を処理し、アクションをトリガーします  |  評価者 Lambda  |   AWS IoT Events ディテクターモデルを置き換え、ビジュアルワークフローの代わりにコードを通じてカスタマイズ可能な状態ロジック評価を提供  | 
|  **状態管理** – デバイスの状態を維持します  |  DynamoDB テーブル  |  デバイス状態の永続的ストレージを提供し、内部 AWS IoT Events 状態管理を置き換える新しいコンポーネント  | 
|  **セキュリティ** – サービスアクセス許可を管理します  |  IAM ロール  |  更新されたアクセス許可には、既存のアクセス AWS IoT Core 許可に加えて、Kinesis Data Streams、DynamoDB、EventBridge へのアクセスが含まれるようになりました。  | 

## ステップ 1: (オプション) AWS IoT Events ディテクターモデル設定をエクスポートする
<a name="eos-detector-model-export-events-data"></a>

新しいリソースを作成する前に、 AWS IoT Events ディテクターモデル定義をエクスポートします。これらにはイベント処理ロジックが含まれており、新しいソリューションを実装するための過去のリファレンスとして使用できます。

------
#### [ Console ]

を使用して AWS IoT Events AWS マネジメントコンソール、次の手順を実行してディテクターモデル設定をエクスポートします。

**を使用してディテクターモデルをエクスポートするには AWS マネジメントコンソール**

1. [AWS IoT Events コンソール](https://console.aws.amazon.com/iotevents/) にログインします。

1. 左のナビゲーションペインで、[**Detector models (ディテクターモデル)**] を選択します。

1. エクスポートするディテクターモデルを選択します。

1. **[エクスポート]** を選択します。出力に関する情報メッセージを読み、もう一度**エクスポート**を選択します。

1. エクスポートするディテクターモデルごとにプロセスを繰り返します。

ディテクターモデルの JSON 出力を含むファイルがブラウザのダウンロードフォルダに追加されます。オプションで各ディテクターモデル設定を保存して、履歴データを保持できます。

------
#### [ AWS CLI ]

を使用して AWS CLI、次のコマンドを実行してディテクターモデル設定をエクスポートします。

**を使用してディテクターモデルをエクスポートするには AWS CLI**

1. アカウント内のすべてのディテクターモデルを一覧表示します。

   ```
   aws iotevents list-detector-models
   ```

1. ディテクターモデルごとに、以下を実行して設定をエクスポートします。

   ```
   aws iotevents describe-detector-model \
      --detector-model-name your-detector-model-name
   ```

1. 各ディテクターモデルの出力を保存します。

------

## ステップ 2: IAM ロールを作成する
<a name="eos-detector-model-create-iam-role"></a>

IAM ロールを作成して、 の機能をレプリケートするアクセス許可を付与します AWS IoT Events。この例のロールは、状態管理用の DynamoDB、スケジューリング用の EventBridge、データ取り込み用の Kinesis Data Streams、メッセージの発行 AWS IoT Core 用の 、ログ記録用の CloudWatch へのアクセスを許可します。これらのサービスは、 の代替として一緒に機能します AWS IoT Events。

1. 以下のアクセス許可を持つ IAM ロールを作成します。IAM ロールの作成方法の詳細については、*IAM* [ユーザーガイドの「 AWS サービスにアクセス許可を委任するロールを作成する](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)」を参照してください。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "DynamoDBAccess",
               "Effect": "Allow",
               "Action": [
                   "dynamodb:GetItem",
                   "dynamodb:PutItem",
                   "dynamodb:UpdateItem",
                   "dynamodb:DeleteItem",
                   "dynamodb:Query",
                   "dynamodb:Scan"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/EventsStateTable"
           },
           {
               "Sid": "SchedulerAccess",
               "Effect": "Allow",
               "Action": [
                   "scheduler:CreateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Resource": "arn:aws:scheduler:us-east-1:123456789012:schedule/*"
           },
           {
               "Sid": "KinesisAccess",
               "Effect": "Allow",
               "Action": [
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:DescribeStream",
                   "kinesis:ListStreams"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/*"
           },
           {
               "Sid": "IoTPublishAccess",
               "Effect": "Allow",
               "Action": "iot:Publish",
               "Resource": "arn:aws:iot:us-east-1:123456789012:topic/*"
           },
           {
               "Effect": "Allow",
               "Action": "logs:CreateLogGroup",
               "Resource": "arn:aws:logs:us-east-1:123456789012:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": [
               "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/your-lambda:*"
               ]
           }
       ]
   }
   ```

------

1. 次の IAM ロールの信頼ポリシーを追加します。信頼ポリシーは、指定された AWS サービスが必要なアクションを実行できるように、IAM ロールを引き受けることを許可します。IAM 信頼ポリシーの作成方法の詳細については、*IAM* [ユーザーガイドのカスタム信頼ポリシーを使用してロールを作成する](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-custom.html)を参照してください。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": [
                       "scheduler.amazonaws.com",
                       "lambda.amazonaws.com",
                       "iot.amazonaws.com"
                   ]
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

## ステップ 3: Amazon Kinesis Data Streams を作成する
<a name="eos-detector-model-create-kinesis-stream"></a>

 AWS マネジメントコンソール または を使用して Amazon Kinesis Data Streams を作成します AWS CLI。

------
#### [ Console ]

を使用して Kinesis データストリームを作成するには AWS マネジメントコンソール、*Amazon Kinesis Data Streams* [デベロッパーガイド」の「データストリームの作成](https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html)」ページにある手順に従ってください。

デバイス数とメッセージペイロードサイズに基づいてシャード数を調整します。

------
#### [ AWS CLI ]

を使用して AWS CLI Amazon Kinesis Data Streams を作成し、デバイスからデータを取り込んでパーティション化します。

Kinesis Data Streams は、この移行でデータの取り込み機能を置き換えるために使用されます AWS IoT Events。IoT デバイスからリアルタイムのストリーミングデータを収集、処理、分析するスケーラブルで効率的な方法を提供すると同時に、柔軟なデータ処理と他の AWS サービスとの統合を提供します。

```
aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region
```

デバイス数とメッセージペイロードサイズに基づいてシャード数を調整します。

------

## ステップ 4: MQTT メッセージルーティングルールを作成または更新する
<a name="eos-detector-model-mqtt-rule"></a>

新しい MQTT メッセージルーティングルールを作成するか、既存のルールを更新できます。

------
#### [ Console ]

1. 新しい MQTT メッセージルーティングルールが必要かどうか、または既存のルールを更新できるかどうかを決定します。

1. [AWS IoT Core コンソール](https://console.aws.amazon.com/iot/)を開きます。

1. ナビゲーションペインで、**メッセージルーティング**を選択し、ルールを選択します****。

1. **管理**セクションで、**メッセージルーティング**を選択し、**ルール**を選択します。

1. **[‬ルールを作成]‭** を選択します。

1. **ルールプロパティの指定**ページで、 AWS IoT Core ルール名の**ルール名**を入力します。**ルールの説明 - *オプション*で、イベントを処理して Kinesis Data Streams に転送していることを識別する説明を入力します。**

1. **SQL ステートメントの設定**ページで、**SQL ステートメント**に「」と入力し**SELECT \$1 FROM 'your-database'**、**次へ**を選択します。

1. ルールのア**タッチアクション**ページで、ルール**アクション**で **kinesis** を選択します。

1. ストリームの Kinesis ストリームを選択します。パーティションキーに「**your-instance-id**」と入力します。IAM ロールに適したロールを選択し、**ルールの追加アクション**を選択します。

詳細については、「[Creating AWS IoT rules to route device data to other services](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules-tutorial.html)」を参照してください。

------
#### [ AWS CLI ]

1. 次の内容を含む JSON ファイル を作成します。この JSON 設定ファイルは、トピックからすべてのメッセージを選択し、インスタンス ID をパーティションキーとして使用して、指定された Kinesis ストリームに転送する AWS IoT Core ルールを定義します。

   ```
   {
       "sql": "SELECT * FROM 'your-config-file'",
       "description": "Rule to process events and forward to Kinesis Data Streams",
       "actions": [
           {
               "kinesis": {
                   "streamName": "your-kinesis-stream-name",
                   "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role",
                   "partitionKey": "${your-instance-id}"
               }
           }
       ],
       "ruleDisabled": false,
       "awsIotSqlVersion": "2016-03-23"
   }
   ```

1. を使用して MQTT トピックルールを作成します AWS CLI。このステップでは AWS CLI 、 を使用して、 `events_rule.json` ファイルで定義された設定を使用して AWS IoT Core トピックルールを作成します。

   ```
   aws iot create-topic-rule \
       --rule-name "your-iot-core-rule" \
       --topic-rule-payload file://your-file-name.json
   ```

------

## ステップ 5: 送信先 MQTT トピックのエンドポイントを取得する
<a name="eos-detector-model-get-mqtt-endpoint"></a>

送信先 MQTT トピックを使用して、トピックが送信メッセージを発行する場所を設定し、以前に処理された機能を置き換えます AWS IoT Events。エンドポイントは、 AWS アカウントとリージョンに固有です。

------
#### [ Console ]

1. [AWS IoT Core コンソール](https://console.aws.amazon.com/iot/)を開きます。

1. 左側のナビゲーションパネル**の接続**セクションで、**ドメイン設定**を選択します。

1. **iot:Data-ATS** ドメイン設定を選択して、設定の詳細ページを開きます。

1. **ドメイン名**の値をコピーします。この値はエンドポイントです。後のステップで必要になるため、エンドポイント値を保存します。

------
#### [ AWS CLI ]

次のコマンドを実行して、アカウントの送信メッセージを発行するための AWS IoT Core エンドポイントを取得します。

```
aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region
```

------

## ステップ 6: Amazon DynamoDB テーブルを作成する
<a name="eos-detector-model-create-dynamodb-table"></a>

 Amazon DynamoDB テーブルは、 の状態管理機能を置き換え AWS IoT Events、新しいソリューションアーキテクチャでデバイスの状態とディテクターモデルロジックを持続および管理するためのスケーラブルで柔軟な方法を提供します。

------
#### [ Console ]

Amazon DynamoDB テーブルを作成して、ディテクターモデルの状態を維持します。詳細については、「Amazon [DynamoDB デベロッパーガイド」の「DynamoDB でテーブルを作成する](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)」を参照してください。 * DynamoDB *

テーブルの詳細については、以下を使用します。
+ **テーブル名**に、選択したテーブル名を入力します。
+ **パーティションキー**には、独自のインスタンス ID を入力します。
+ **テーブル****設定のデフォルト設定**を使用できます。

------
#### [ AWS CLI ]

次のコマンドを実行して、DynamoDB テーブルを作成します。

```
aws dynamodb create-table \
                        --table-name your-table-name \
                        --attribute-definitions AttributeName=your-instance-id,AttributeType=S \
                        --key-schema AttributeName=your-instance-id,KeyType=HASH \
```

------

## ステップ 7: AWS Lambda 関数を作成する (コンソール)
<a name="eos-detector-model-create-lambda-function"></a>

 Lambda 関数はコア処理エンジンとして機能し、ディテクターモデルの評価ロジックを置き換えます AWS IoT Events。この例では、他の AWS サービスと統合して、定義されたルールに基づいて受信データを処理し、状態を管理し、アクションをトリガーします。

NodeJS ランタイムを使用して Lambda 関数を作成します。次のコードスニペットを使用して、ハードコードされた定数を置き換えます。

1. [AWS Lambda console](https://console.aws.amazon.com/lambda/) を開きます。

1. [**関数の作成**] を選択してください。

1. **関数**名の名前を入力します。

1. **ランタイム**として **NodeJS 22.x** を選択します。

1. **デフォルトの実行ロールの変更**ドロップダウンで、**既存のロールを使用**を選択し、前のステップで作成した IAM ロールを選択します。

1. [**関数の作成**] を選択してください。

1. ハードコードされた定数を置き換えた後、次のコードスニペットに貼り付けます。

1. 関数の作成後、**コード**タブに次のコード例を貼り付け、**your-destination-endpoint**エンドポイントを独自のコードに置き換えます。

```
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
import { PutItemCommand } from '@aws-sdk/client-dynamodb';
import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane";
import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import


//// External Clients and Constants
const scheduler = new SchedulerClient({});
const iot = new IoTDataPlaneClient({
    endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/'
});
const ddb = new DynamoDBClient({});


//// Lambda Handler function
export const handler = async (event) => {
    console.log('Incoming event:', JSON.stringify(event, null, 2));

    if (!event.Records) {
        throw new Error('No records found in event');
    }

    const processedRecords = [];

    for (const record of event.Records) {
        try {
            if (record.eventSource !== 'aws:kinesis') {
                console.log(`Skipping non-Kinesis record from ${record.eventSource}`);
                continue;
            }

            // Assumes that we are processing records from Kinesis
            const payload = record.kinesis.data;
            const decodedData = Buffer.from(payload, 'base64').toString();
            console.log("decoded payload is ", decodedData);

            const output = await handleDecodedData(decodedData);

            // Add additional processing logic here
            const processedData = {
                output,
                sequenceNumber: record.kinesis.sequenceNumber,
                partitionKey: record.kinesis.partitionKey,
                timestamp: record.kinesis.approximateArrivalTimestamp
            };

            processedRecords.push(processedData);

        } catch (error) {
            console.error('Error processing record:', error);
            console.error('Failed record:', record);
            // Decide whether to throw error or continue processing other records
            // throw error; // Uncomment to stop processing on first error
        }
    }

    return {
        statusCode: 200,
        body: JSON.stringify({
            message: 'Processing complete',
            processedCount: processedRecords.length,
            records: processedRecords
        })
    };
};

// Helper function to handle decoded data
async function handleDecodedData(payload) {
    try {
        // Parse the decoded data
        const parsedData = JSON.parse(payload);

        // Extract instanceId
        const instanceId = parsedData.instanceId;
        // Parse the input field
        const inputData = JSON.parse(parsedData.payload);
        const temperature = inputData.temperature;
        console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature);

        await iotEvents.process(instanceId, inputData)

        return {
            instanceId,
            temperature,
            // Add any other fields you want to return
            rawInput: inputData
        };
    } catch (error) {
        console.error('Error handling decoded data:', error);
        throw error;
    }
}


//// Classes for declaring/defining the state machine
class CurrentState {
    constructor(instanceId, stateName, variables, inputs) {
        this.stateName = stateName;
        this.variables = variables;
        this.inputs = inputs;
        this.instanceId = instanceId
    }

    static async load(instanceId) {
        console.log(`Loading state for id ${instanceId}`);
        try {
            const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({
                TableName: 'EventsStateTable',
                Key: {
                    'InstanceId': { S: `${instanceId}` }
                }
            }));

            const { stateName, variables, inputs } = JSON.parse(stateContent);

            return new CurrentState(instanceId, stateName, variables, inputs);
        } catch (e) {
            console.log(`No state for id ${instanceId}: ${e}`);
            return undefined;
        }
    }

    static async save(instanceId, state) {
        console.log(`Saving state for id ${instanceId}`);
        await ddb.send(new PutItemCommand({
            TableName: 'your-events-state-table-name',
            Item: {
                'InstanceId': { S: `${instanceId}` },
                'state': { S: state }
            }
        }));
    }

    setVariable(name, value) {
        this.variables[name] = value;
    }

    changeState(stateName) {
        console.log(`Changing state from ${this.stateName} to ${stateName}`);
        this.stateName = stateName;
    }

    async setTimer(instanceId, frequencyInMinutes, payload) {
        console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`);

        const base64Payload = Buffer.from(JSON.stringify(payload)).toString();
        console.log(base64Payload);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const scheduleParams = {
            Name: scheduleName,
            FlexibleTimeWindow: {
                Mode: 'OFF'
            },
            ScheduleExpression: `rate(${frequencyInMinutes} minutes)`,
            Target: {
                Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name",
                RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role",
                Input: base64Payload,
                KinesisParameters: {
                    PartitionKey: instanceId,
                },
                RetryPolicy: {
                    MaximumRetryAttempts: 3
                }
            },

        };

        const command = new CreateScheduleCommand(scheduleParams);
        console.log(`Sending command to set timer ${JSON.stringify(command)}`);
        await scheduler.send(command);
    }

    async clearTimer(instanceId) {
        console.log(`Cleaning timer ${instanceId}`);

        const scheduleName = `your-schedule-name-${instanceId}-schedule`;
        const command = new DeleteScheduleCommand({
            Name: scheduleName
        });
        await scheduler.send(command);
    }

    async executeAction(actionType, actionPayload) {
        console.log(`Will execute the ${actionType} with payload ${actionPayload}`);
        await iot.send(new PublishCommand({
            topic: `${this.instanceId}`,
            payload: actionPayload,
            qos: 0
        }));
    }

    setInput(value) {
        this.inputs = { ...this.inputs, ...value };
    }

    input(name) {
        return this.inputs[name];
    }
}


class IoTEvents {

    constructor(initialState) {
        this.initialState = initialState;
        this.states = {};
    }

    state(name) {
        const state = new IoTEventsState();
        this.states[name] = state;
        return state;
    }

    async process(instanceId, input) {
        let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {});
        currentState.setInput(input);

        console.log(`With inputs as: ${JSON.stringify(currentState)}`);
        const state = this.states[currentState.stateName];

        currentState = await state.evaluate(currentState);
        console.log(`With output as: ${JSON.stringify(currentState)}`);

        await CurrentState.save(instanceId, JSON.stringify(currentState));
    }
}

class Event {
    constructor(condition, action) {
        this.condition = condition;
        this.action = action;
    }
}

class IoTEventsState {
    constructor() {
        this.eventsList = []
    }

    events(eventListArg) {
        this.eventsList.push(...eventListArg);
        return this;
    }

    async evaluate(currentState) {
        for (const e of this.eventsList) {
            console.log(`Evaluating event ${e.condition}`);
            if (e.condition(currentState)) {
                console.log(`Event condition met`);
                // Execute any action as defined in iotEvents DM Definition
                await e.action(currentState);
            }
        }

        return currentState;
    }
}

////// DetectorModel Definitions - replace with your own defintions
let processAlarmStateEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        return (
            currentState.input('temperature') < 70
        );
    },
    async (currentState) => {
        currentState.changeState('normal');
        await currentState.clearTimer(currentState.instanceId)
        await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`);
    }
);

let processTimerEvent = new Event(
    (currentState) => {
        const source = currentState.input('source');
        console.log(`Evaluating timer event with source ${source}`);
        const booleanOutput = (source !== undefined && source !== null &&
            typeof source === 'string' &&
            source.toLowerCase() === 'timer' &&
            // check if the currentState == state from the timer payload
            currentState.input('currentState') !== undefined &&
            currentState.input('currentState') !== null &&
            currentState.input('currentState').toLowerCase !== 'normal');
        console.log(`Timer event evaluated as ${booleanOutput}`);
        return booleanOutput;
    },
    async (currentState) => {
        await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`);
    }
);

let processNormalEvent = new Event(
    (currentState) => currentState.input('temperature') > 70,
    async (currentState) => {
        currentState.changeState('alarm');
        await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`);
        await currentState.setTimer(currentState.instanceId, 5, {
            "instanceId": currentState.instanceId,
            "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}"
        });
    }
);
const iotEvents = new IoTEvents('normal');
iotEvents
    .state('normal')
    .events(
        [
            processNormalEvent
        ]);
iotEvents
    .state('alarm')
    .events([
            processAlarmStateEvent,
            processTimerEvent
        ]
    );
```

## ステップ 8: Amazon Kinesis Data Streams トリガーを追加する
<a name="eos-detector-model-add-kinesis-trigger"></a>

 AWS マネジメントコンソール または を使用して、Kinesis Data Streams トリガーを Lambda 関数に追加します AWS CLI。

Lambda 関数に Kinesis Data Streams トリガーを追加すると、データ取り込みパイプラインと処理ロジック間の接続が確立され、入力 AWS IoT Events の処理方法と同様に、受信 IoT データストリームを自動的に評価し、イベントにリアルタイムで対応できるようになります。

------
#### [ Console ]

詳細については、[「 デベロッパーガイド」の「Lambda 関数を呼び出すイベントソースマッピングを作成する](https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html#services-kinesis-eventsourcemapping)」を参照してください。 *AWS Lambda *

イベントソースマッピングの詳細には、以下を使用します。
+ **関数名**には、 で使用される Lambda 名を入力します[ステップ 7: AWS Lambda 関数を作成する (コンソール)](#eos-detector-model-create-lambda-function)。
+ **コンシューマー - オプション**で、Kinesis ストリームの ARN を入力します。
+ **[バッチサイズ]** に、**10** を入力します。

------
#### [ AWS CLI ]

次のコマンドを実行して、Lambda 関数トリガーを作成します。

```
aws lambda create-event-source-mapping \
    --function-name your-lambda-name \
    --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \
    --batch-size 10 \
    --starting-position LATEST \
    --region your-region
```

------

## ステップ 9: データの取り込みと出力機能をテストする (AWS CLI)
<a name="eos-detector-model-data-ingestion-and-output"></a>

ディテクターモデルで定義した内容に基づいて MQTT トピックにペイロードを発行します。実装をテスト`your-topic-name`するための MQTT トピックへのペイロードの例を次に示します。

```
{
  "instanceId": "your-instance-id",
  "payload": "{\"temperature\":78}"
}
```

次の (または同様の) コンテンツを含むトピックに発行された MQTT メッセージが表示されます。

```
{
    "state": "alarm detected, timer started"
}
```