

# Lambda を使用した Amazon Kinesis Data Streams レコードの処理
<a name="services-kinesis-create"></a>

Lambda を使用して Amazon Kinesis Data Streams レコードを処理するには、Lambda イベントソースマッピングを作成します。Lambda 関数は標準イテレーターか、拡張ファンアウトコンシューマーにマッピングすることができます。詳細については、「[ポーリングストリームとバッチストリーム](with-kinesis.md#kinesis-polling-and-batching)」を参照してください。

## Kinesis イベントソースマッピングを作成する
<a name="services-kinesis-eventsourcemapping"></a>

データストリームからのレコードを使用して Lambda 関数を呼び出すには、[イベントソースマッピング](invocation-eventsourcemapping.md)を作成します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のデータストリームの項目を処理したりできます。複数のストリームから項目を処理する場合、各バッチには 1 つのシャードまたはストリームのレコードのみが含まれます。

別の AWS アカウント のストリームからのレコードを処理するようにイベント ソース マッピングを構成できます。詳細については[クロスアカウントのイベントソースマッピングの作成](#services-kinesis-eventsourcemapping-cross-account)を参照してください。

イベントソースマッピングを作成する前に、Kinesis データストリームから読み取るためのアクセス許可を Lambda 関数に付与する必要があります。Lambda には、Kinesis データストリームに関連するリソースを管理するために次のアクセス許可が必要です。
+ [kinesis:DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [kinesis:DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [kinesis:GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [kinesis:GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [kinesis:ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [kinesis:SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

AWS マネージドポリシー [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html) には、これらのアクセス許可が含まれています。次の手順の説明に従って、この管理ポリシーを関数に追加します。

**注記**  
Kinesis のイベントソースマッピングを作成および管理するための `kinesis:ListStreams` アクセス許可は必要ありません。ただし、コンソールでイベントソースマッピングを作成し、このアクセス許可がない場合、ドロップダウンリストから Kinesis ストリームを選択できず、コンソールにエラーが表示されます。イベントソースマッピングを作成するには、ストリームの Amazon リソースネーム (ARN) を手動で入力する必要があります。
Lambda は、失敗した呼び出しを再試行する際に `kinesis:GetRecords` および `kinesis:GetShardIterator` API 呼び出しを行います。

------
#### [ AWS マネジメントコンソール ]

**関数に Kinesis アクセス許可を追加するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開き、関数を選択します。

1. **[構成]** タブで、**[アクセス許可]** を選択します。

1. **[実行ロール]** ペインの **[ロール名]** で、関数の実行ロールへのリンクを選択します。このリンクを選択すると、IAM コンソールでそのロールのページが開きます。

1. **[アクセス許可ポリシー]** ペインで、**[アクセス許可を追加]** を選択し、**[ポリシーをアタッチ]** を続けて選択します。

1. [検索] フィールドに **AWSLambdaKinesisExecutionRole** を入力します。

1. ポリシーの名前の横にあるチェックボックスを選択し、**[アクセス許可を追加]** を選択します。

------
#### [ AWS CLI ]

**関数に Kinesis アクセス許可を追加するには**
+ 次の CLI コマンドを実行して、`AWSLambdaKinesisExecutionRole` ポリシーを関数の実行ロールに追加します。

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**関数に Kinesis アクセス許可を追加するには**
+ 関数の定義で、次の例に示すように `Policies` プロパティを追加します。

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

必要なアクセス許可を設定した後、イベントソースマッピングを作成します。

------
#### [ AWS マネジメントコンソール ]

**Kinesis イベントソースマッピングを作成するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開き、関数を選択します。

1. **[関数の概要]** ペインで、**[トリガーを追加]** を選択します。

1. **[トリガー設定]** で、ソースとして **[Kinesis]** を選択します。

1. イベントソースマッピングを作成する Kinesis ストリームを選択し、オプションでストリームのコンシューマーを選択します。

1. (オプション) イベントソースマッピングの**バッチサイズ** 、**開始位置**、**バッチウィンドウ**を編集します。

1. **[Add]** (追加) を選択します。

コンソールからイベントソースマッピングを作成する場合は、IAM ロールには [kinesis:ListStreams](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html) 権限と [kinesis:ListStreamConsumers](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html) 権限が必要です。

------
#### [ AWS CLI ]

**Kinesis イベントソースマッピングを作成するには**
+ 次の CLI コマンドを実行して、Kinesis イベントソースマッピングを作成します。ユースケースに応じて、独自のバッチサイズと開始位置を選択します。

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

バッチ処理ウィンドウを指定するには、`--maximum-batching-window-in-seconds` オプションを追加します。このパラメータおよびその他のパラメータの使用の詳細については、「*AWS CLI コマンドリファレンス*」の「[create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)」を参照してください。

------
#### [ AWS SAM ]

**Kinesis イベントソースマッピングを作成するには**
+ 関数の定義で、次の例に示すように `KinesisEvent` プロパティを追加します。

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

AWS SAM で Kinesis Data Streams のイベントソースマッピングを作成する方法の詳細については、「*AWS Serverless Application Model デベロッパーガイド*」の「[Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html)」を参照してください。

------

## ポーリングとストリームの開始位置
<a name="services-kinesis-stream-start-pos"></a>

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。
+ イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。
+ イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、`LATEST` をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を `TRIM_HORIZON` または `AT_TIMESTAMP` として指定します。

## クロスアカウントのイベントソースマッピングの作成
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

[Amazon Kinesis Data Streams](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html) は、リソースベースのポリシーをサポートします。このため、別のアカウントの Lambda 関数を使用して AWS アカウント のストリームに取り込まれたデータを処理できます。

別の AWS アカウント の Kinesis ストリームを使用して Lambda 関数のイベントソースマッピングを作成するには、リソースベースのポリシーを使用してストリームを設定し、Lambda 関数に項目を読み取るアクセス許可を付与する必要があります。クロスアカウントアクセスを許可するようにストリームを設定する方法については、「*Amazon Kinesis Streams デベロッパーガイド*」の「[クロスアカウント AWS Lambda 関数とアクセスを共有する](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda)」を参照してください。

Lambda 関数に必要なアクセス許可を付与するリソースベースのポリシーでストリームを設定したら、前のセクションで説明した方法のいずれかを使用してイベントソースマッピングを作成します。

Lambda コンソールでイベントソースマッピングを作成する場合は、ストリームの ARN を入力フィールドに直接貼り付けます。ストリームにコンシューマーを指定する場合、コンシューマーの ARN を貼り付けると、ストリームフィールドが自動的に入力されます。