

# Lambda を使用して Amazon Kinesis Data Streams からのレコードを処理する
<a name="with-kinesis"></a>

Lambda 関数を使用して、[Amazon Kinesis データストリーム](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)のレコードを処理できます。Lambda 関数を Kinesis Data Streams 共有スループットコンシューマー (標準イテレーター) にマップすることも、[拡張ファンアウト](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html)を使用する専用スループットコンシューマーにマップすることもできます。標準イテレーターの場合、Lambda は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

 Kinesis Data Streams の詳細については、[Reading Data from Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html) を参照してください。

**注記**  
Kinesis は、各シャードに対して課金し、拡張ファンアウトの場合はストリームから読み取られたデータに対して課金します。料金の詳細については、[Amazon Kinesis の料金](https://aws.amazon.com/kinesis/data-streams/pricing)を参照してください。

## ポーリングストリームとバッチストリーム
<a name="kinesis-polling-and-batching"></a>

Lambda はデータストリームからレコードを読み取り、関数を、ストリームのレコードを含むイベントと共に[同期的に](invocation-sync.md)呼び出します。Lambda はバッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。各バッチには、単一のシャード/データストリームのレコードが含まれます。

Lambda 関数は、データストリームのコンシューマーアプリケーションです。シャードごとに 1 つのレコードのバッチを一度に処理します。Lambda 関数を共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。
+ **標準イテレーター:** Lambda は、レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします。利用可能なレコードが増えると、Lambda は関数がストリームに追いつくまでバッチを処理し続けます。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。
+ **拡張ファンアウト:** レイテンシーを最小限に抑え、読み取りスループットを最大化するには、[拡張ファンアウト](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)を使用してデータストリームコンシューマーを作成します。拡張ファンアウトを使用するコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。ストリームのコンシューマーは HTTP/2 を使用して、長時間にわたる接続とリクエストヘッダーの圧縮でレコードを Lambda にプッシュすることによってレイテンシーを短縮します。ストリームコンシューマーは、Kinesis [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) API を使用して作成できます。

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

次のような出力が表示されます。

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

関数がレコードを処理する速度を上げるには、[データストリームにシャードを追加します](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards)。Lambda は、各シャードのレコードを順番に処理します。関数からエラーが返された場合、シャードのさらなるレコードの処理は停止されます。シャードが増えると、一度に処理されるバッチが増え、同時実行のエラーの影響を下げることができます。

同時実行のバッチの合計分を処理できるように関数をスケールアップできない場合は、関数の[クォータ引き上げをリクエスト](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html)するか、[同時実行数を予約](configuration-concurrency.md)します。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、*バッチ処理ウィンドウ*を設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

**警告**  
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、AWS ナレッジセンターの「[Lambda 関数を冪等にするにはどうすればよいですか?](https://repost.aws/knowledge-center/lambda-function-idempotent)」を参照してください。

Lambda は、設定された[拡張機能](lambda-extensions.md)の完了を待たずに、次のバッチを処理のために送信します。つまり、Lambda がレコードの次のバッチを処理する間も拡張機能を実行し続けることができます。アカウントの[同時実行](lambda-concurrency.md)設定または制限に違反すると、スロットリングの問題が発生する可能性があります。これが潜在的な問題であるかどうかを検出するには、関数を監視し、イベントソースマッピングで予想よりも高い[同時実行メトリクス](monitoring-concurrency.md#general-concurrency-metrics)が表示されているかどうかを確認します。呼び出しの間隔が短いため、Lambda はシャードの数よりも高い同時実行使用量を一時的に報告する場合があります。これは、拡張機能のない Lambda 関数にも当てはまります。

Kinesis データストリームの 1 つのシャードを複数の Lambda 呼び出しで同時に処理するには、[ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 設定を構成します。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)～10 の並列化係数で指定できます。例えば、`ParallelizationFactor` を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、100 個の Kinesis データシャードを処理できます (ただし、`ConcurrentExecutions` メトリクスに異なる値が表示される場合があります)。これにより、データボリュームが揮発性で `IteratorAge` が高いときに処理のスループットをスケールアップすることができます。シャードごとの同時実行バッチの数を増やしても、Lambda はパーティションキーレベルで順序立った処理を確実に行います。

Kinesis 集約で `ParallelizationFactor` を使用することもできます。イベントソースマッピングの動作は、[拡張ファンアウト](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)を使用しているかどうかによって異なります。
+ **拡張ファンアウトなし**: 集約イベント内のすべてのイベントは、同じパーティションキーを持つ必要があります。パーティションキーは、集約イベントのパーティションキーとも一致する必要があります。集約イベント内のイベントに異なるパーティションキーがある場合、Lambda ではパーティションキーによるイベントが順序通りに処理されないことがあります。
+ **拡張ファンアウトあり**: まず、Lambda は集約イベントを個々のイベントにデコードします。集約イベントには、含まれるイベントとは異なるパーティションキーを設定できます。ただし、パーティションキーに一致しないイベントは[削除され、失われ](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md)ます。Lambda ではこれらのイベントは処理されず、設定された障害時の送信先には送信されません。

## イベントの例
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```