

# 使用 Lambda 处理 Amazon Kinesis Data Streams 记录
<a name="services-kinesis-create"></a>

要使用 Lambda 处理 Amazon Kinesis Data Streams 记录，可创建 Lambda 事件源映射。您可以将 Lambda 函数映射到标准迭代器或增强型扇出功能使用者。有关更多信息，请参阅 [轮询和批处理流](with-kinesis.md#kinesis-polling-and-batching)。

## 创建 Kinesis 事件源映射
<a name="services-kinesis-eventsourcemapping"></a>

要使用来自数据流的记录调用 Lambda 函数，请创建一个[事件源映射](invocation-eventsourcemapping.md)。您可以创建多个事件源映射，以使用多个 Lambda 函数处理相同的数据，或使用单个函数处理来自多个数据流的项目。处理来自多个流的项目时，每个批处理将只包含来自单个分片或流的记录。

您可以配置事件源映射来处理来自不同 AWS 账户中的流的记录。要了解更多信息，请参阅[创建跨账户事件源映射](#services-kinesis-eventsourcemapping-cross-account)。

在创建事件源映射之前，您需要向您的 Lambda 函数授予读取 Kinesis 数据流中数据的权限。Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源：
+ [kinesis:DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [kinesis:DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [kinesis:GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [kinesis:GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [kinesis:ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [kinesis:SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

AWS 托管式策略 [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html) 包含这些权限。按照以下过程所述将此托管式策略添加到您的函数。

**注意**  
您不需要 `kinesis:ListStreams` 权限来创建和管理 Kinesis 的事件源映射。但是，如果您在控制台中创建事件源映射但没有此权限，则无法从下拉列表中选择 Kinesis 流，并且控制台将显示错误。要创建事件源映射，您需要手动输入流的 Amazon 资源名称（ARN）。
Lambda 在重试失败的调用时会进行 `kinesis:GetRecords` 和 `kinesis:GetShardIterator` API 调用。

------
#### [ AWS 管理控制台 ]

**为您的函数添加 Kinesis 权限**

1. 打开 Lambda 控制台的[“函数”页面](https://console.aws.amazon.com/lambda/home#/functions)，然后选择函数。

1. 在**配置**选项卡中，选择**权限**。

1. 在**执行角色**窗格的**角色名称**下，选择指向函数的执行角色的链接。此链接将在 IAM 控制台中打开该角色的页面。

1. 在**权限策略**窗格中，选择**添加权限**，然后选择**附加策略**。

1. 在搜索字段中输入 **AWSLambdaKinesisExecutionRole**。

1. 选中该策略名称旁边的复选框，然后选择**添加权限**。

------
#### [ AWS CLI ]

**为您的函数添加 Kinesis 权限**
+ 运行以下 CLI 命令，以将 `AWSLambdaKinesisExecutionRole` 策略附加到函数的执行角色。

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**为您的函数添加 Kinesis 权限**
+ 在函数定义中添加 `Policies` 属性，如以下示例所示：

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

配置所需的权限后，创建事件源映射。

------
#### [ AWS 管理控制台 ]

**创建 Kinesis 事件源映射**

1. 打开 Lambda 控制台的[“函数”页面](https://console.aws.amazon.com/lambda/home#/functions)，然后选择函数。

1. 在**函数概述**窗格中，选择**添加触发器**。

1. 在**触发器配置**下，对于源，请选择 ** Kinesis **。

1. 选择要为其创建事件源映射的 Kinesis 流，也可以选择流的使用者。

1. （可选）编辑事件源映射的**批处理大小**、**起始位置**和**批处理窗口**。

1. 选择**添加**。

在控制台中创建事件源映射时，您的 IAM 角色必须拥有 [kinesis:ListStreams](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html) 和 [kinesis:ListStreamConsumers](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html) 权限。

------
#### [ AWS CLI ]

**创建 Kinesis 事件源映射**
+ 运行以下 CLI 命令以创建 Kinesis 事件源映射。根据您的应用场景选择自己的批量大小和起始位置。

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

要指定批处理时间窗，请添加 `--maximum-batching-window-in-seconds` 选项。有关使用此参数和其他参数的更多信息，请参阅《AWS CLI Command Reference》中的 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)**。

------
#### [ AWS SAM ]

**创建 Kinesis 事件源映射**
+ 在函数定义中添加 `KinesisEvent` 属性，如以下示例所示：

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

要了解有关在 AWS SAM 中创建 Kinesis 数据流事件源映射的更多信息，请参阅《AWS Serverless Application Model Developer Guide》中的 [Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html)**。

------

## 轮询和流的起始位置
<a name="services-kinesis-stream-start-pos"></a>

请注意，事件源映射创建和更新期间的流轮询最终是一致的。
+ 在事件源映射创建期间，可能需要几分钟才能开始轮询来自流的事件。
+ 在事件源映射更新期间，可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着，如果你指定 `LATEST` 作为流的起始位置，事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件，请将流的起始位置指定为 `TRIM_HORIZON` 或 `AT_TIMESTAMP`。

## 创建跨账户事件源映射
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

Amazon Kinesis Data Streams 支持[基于资源的策略](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html)。因此，您可以在一个 AWS 账户中使用 Lambda 函数来处理另一个账户的流中摄入的数据。

要使用其他 AWS 账户中的 Kinesis 流为您的 Lambda 函数创建事件源映射，您必须使用基于资源的策略配置该流，以向您的 Lambda 函数授予读取相关项目的权限。要了解如何配置流以允许跨账户存取，请参阅《Amazon Kinesis Streams 开发人员指南》中的 [Sharing access with cross-account AWS Lambda functions](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda)**。

使用基于资源的策略配置流以向您的 Lambda 函数授予所需的权限后，请使用上一节中描述的任何方法创建事件源映射。

如果您选择使用 Lambda 控制台创建事件源映射，请将流的 ARN 直接粘贴到输入字段中。如果您想指定流的使用者，粘贴使用者的 ARN 会自动填充流字段。