

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Lambda 函数作为输出
<a name="how-it-works-output-lambda"></a>

使用 AWS Lambda 作为目标可以更轻松地对 SQL 结果进行后处理，然后再将其发送到最终目标。常见的后处理任务包括：
+ 将多行聚合为一条记录
+ 将当前结果与过去的结果相结合以解决迟到数据的问题
+ 根据信息类型传输到不同的目标
+ 记录格式转换 (如转换为 Protobuf)
+ 字符串操作或转换
+ 分析处理后的数据扩充
+ 地理空间使用案例的自定义处理
+ 数据加密

Lambda 函数可以向各种 AWS 服务和其他目标提供分析信息，包括：
+ [Amazon Simple Storage Service（Amazon S3）](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)
+ 自定义 APIs
+ [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)
+ [Amazon Aurora](http://aurora.apache.org/)
+ [Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/)
+ [Amazon Simple Notification Service (Amazon SNS)](https://docs.aws.amazon.com/sns/latest/dg/)
+ [Amazon Simple Queue Service (Amazon SQS)](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/)
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/)

有关创建 Lambda 应用程序的更多信息，请参阅[AWS Lambda入门](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)。

**Topics**
+ [

## Lambda 作为输出权限
](#how-it-works-output-lambda-perms)
+ [

## Lambda 作为输出指标
](#how-it-works-output-lambda-metrics)
+ [

## Lambda 作为输出事件输入数据模型和记录响应模型
](#how-it-works-output-lambda-model)
+ [

## Lambda 输出调用频率
](#how-it-works-output-lambda-frequency)
+ [

## 添加 Lambda 函数作为输出
](#how-it-works-output-lambda-procedure)
+ [

## 常见的 Lambda 作为输出的故障
](#how-it-works-output-lambda-troubleshooting)
+ [

# 为应用程序目标创建 Lambda 函数
](how-it-works-output-lambda-functions.md)

## Lambda 作为输出权限
<a name="how-it-works-output-lambda-perms"></a>

要使用 Lambda 作为输出，应用程序的 Lambda 输出 IAM 角色需要以下权限策略：

```
{
   "Sid": "UseLambdaFunction",
   "Effect": "Allow",
   "Action": [
       "lambda:InvokeFunction",
       "lambda:GetFunctionConfiguration"
   ],
   "Resource": "FunctionARN"
}
```

## Lambda 作为输出指标
<a name="how-it-works-output-lambda-metrics"></a>

您可以使用 Amazon CloudWatch 监控发送的字节数、成功和失败等。[有关 Kinesis Data Analytics 使用 Lambda 作为输出发出的 CloudWatch 指标的信息，请参阅亚马逊 Kinesis Analytics 指标。](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)

## Lambda 作为输出事件输入数据模型和记录响应模型
<a name="how-it-works-output-lambda-model"></a>

要发送 Kinesis Data Analytics 输出记录，您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

### 事件输入数据模型
<a name="how-it-works-output-lambda-model-request"></a>

使用以下请求模型持续将输出记录从应用程序发送到 以作为输出函数。在您的函数中，遍历列表并应用业务逻辑来完成输出要求 (例如，在将数据发送到最终目标之前先进行数据转换)。


| 字段 | 说明 | 
| --- | --- | 
| 字段 | 说明 | 
| --- | --- | 
| 字段 | 说明 | 
| --- | --- | 
| invocationId | &LAM; 调用 ID (随机 GUID)。 | 
| applicationArn | 数据分析应用程序 Amazon 资源名称 (ARN)。 | 
| 记录 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| recordId | 记录 ID (随机 GUID) | 
| lambdaDeliveryRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| 数据 | Base64 编码的输出记录负载 | 
| retryHint | 传输重试次数 | 

**注意**  
`retryHint` 是一个每次传输失败时都会增加的值。该值不会持久不变，并在应用程序中断时重置。

### 记录响应模型
<a name="how-it-works-output-lambda-model-response"></a>

作为输出函数（带记录 IDs）发送到您的 Lambda 的每条记录都必须使用`Ok`或进行确认`DeliveryFailed`，并且必须包含以下参数。否则，Kinesis Data Analytics 将其视为传输失败。


| 字段 | 说明 | 
| --- | --- | 
| 记录 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| recordId | 在调用期间，记录 ID 从 Kinesis Data Analytics 传送到 Lambda。如果原始记录的 ID 和确认记录的 ID 之间不匹配，就会被视为传输失败。 | 
| result | 记录的传输状态。以下是可能的值：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 

## Lambda 输出调用频率
<a name="how-it-works-output-lambda-frequency"></a>

Kinesis Data Analytics 数据分析应用程序缓存输出记录，并频繁调用 AWS Lambda 目标函数。
+ 如果将记录作为翻滚窗口发送到数据分析应用程序内的目标应用程序内流，则每次翻滚窗口触发器都会调用 AWS Lambda 目标函数。例如，如果使用 60 秒的滚动窗口将记录发送到目标应用程序内部流，则每 60 秒调用一次 函数。
+ 如果在该应用程序中使用持续查询或滑动窗口将记录发送到目标应用程序内部流，则大约每秒调用一次 目标函数。

**注意**  
[每 Lambda 函数调用请求负载大小限制](https://docs.aws.amazon.com/lambda/latest/dg/limits.html)适用。超出这些限制将导致拆分输出记录，并在多个 函数调用中进行发送。

## 添加 Lambda 函数作为输出
<a name="how-it-works-output-lambda-procedure"></a>

以下过程说明了如何添加 Lambda 函数以作为 Kinesis Data Analytics 应用程序输出。

1. [登录 AWS 管理控制台 并打开适用于 Apache Flink 的托管服务控制台，网址为 /kinesisanalyt https://console.aws.amazon.com ics。](https://console.aws.amazon.com/kinesisanalytics)

1. 选择列表中的应用程序，然后选择 **Application details**。

1. 在 **Destination** 部分，选择 **Connect new destination**。

1. 对于 **Destination** 项，选择 **AWS Lambda function**。

1. 在**向 AWS Lambda传递记录**部分中，选择现有Lambda 函数或**创建新函数**。

1. 如果您正在创建一个新的 Lambda 函数，请执行以下操作：

   1. 选择提供的模板之一。有关更多信息，请参阅 [为应用程序目标创建 Lambda 函数](how-it-works-output-lambda-functions.md)。

   1. 将在新的浏览器选项卡中打开 **Create Function** (创建函数) 页。在 **Name (名称)** 框中，为函数指定一个有意义的名称（例如，**myLambdaFunction**）。

   1. 针对您的应用程序用后处理功能更新模板。有关创建 Lambda 函数的信息，请参阅*AWS Lambda 开发人员指南*中的[入门](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)。

   1. 在 Kinesis Data Analytics 控制台上的 **Lambda 函数**列表中，选择刚创建的 Lambda 函数。Lambda 函数版本选择**最新**。

1. 在 **In-application stream** 部分，选择 **Choose an existing in-application stream**。对于 **In-application stream name**，选择应用程序的输出流。选定输出流的结果将发送到 Lambda 输出函数。

1. 保持表单其余部分为默认值，然后选择 **Save and continue**。

您的应用程序现在将记录从应用程序内部流发送到 Lambda 函数。您可以在 Amazon CloudWatch 控制台中查看默认模板的结果。监控 `AWS/KinesisAnalytics/LambdaDelivery.OkRecords` 指标，查看传输给 Lambda 函数的记录数。

## 常见的 Lambda 作为输出的故障
<a name="how-it-works-output-lambda-troubleshooting"></a>

传输到 Lambda 函数失败的常见原因如下。
+ 并非批次中发送到 Lambda 函数的所有记录（有记录 IDs）都会返回到 Kinesis Data Analytics 服务。
+ 响应中缺少记录 ID 或状态字段。
+ Lambda 函数超时不足以在 Lambda 函数内完成业务逻辑。
+ Lambda 函数中的业务逻辑不会捕获所有错误，导致因未处理的异常而产生超时和反向压力。这些消息通常称为“毒丸”消息。

如果数据传输失败，Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用，直到成功为止。要深入了解故障，您可以监控以下 CloudWatch 指标：
+ Kinesis Data Analytics 应用程序 Lambda 作为 CloudWatch 输出指标：表示成功和失败的数量以及其他统计数据。有关更多信息，请参阅 [Amazon Kinesis Analytics 指标](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)。
+ AWS Lambda 函数 CloudWatch 指标和日志。

# 为应用程序目标创建 Lambda 函数
<a name="how-it-works-output-lambda-functions"></a>

您的 Kinesis Data Analytics 应用程序 AWS Lambda 可以使用函数作为输出。Kinesis Data Analytics 提供了一些模板以创建用作应用程序目标的 Lambda 函数。可以将这些模板作为应用程序输出后处理的起点。

**Topics**
+ [

## 使用 Node.js 创建 Lambda 函数目标
](#how-it-works-lambda-dest-nodejs)
+ [

## 使用 Python 创建 Lambda 函数目标
](#how-it-works-lambda-dest-python)
+ [

## 使用 Java 创建 Lambda 函数目标
](#how-it-works-lambda-dest-java)
+ [

## 使用 .NET 创建 Lambda 函数目标
](#how-it-works-lambda-net)

## 使用 Node.js 创建 Lambda 函数目标
<a name="how-it-works-lambda-dest-nodejs"></a>

在控制台上提供了以下模板以使用 Node.js 创建目标 Lambda 函数：


| Lambda 作为输出蓝图 | 语言和版本 | 说明 | 
| --- | --- | --- | 
| kinesis-analytics-output | Node.js 12.x | 将 Kinesis Data Analytics 应用程序的输出记录传送到自定义目标。 | 

## 使用 Python 创建 Lambda 函数目标
<a name="how-it-works-lambda-dest-python"></a>

在控制台上提供了以下模板以使用 Python 创建目标 Lambda 函数：


| Lambda 作为输出蓝图 | 语言和版本 | 说明 | 
| --- | --- | --- | 
| kinesis-analytics-output-sns | Python 2.7 | 将 Kinesis Data Analytics 应用程序的输出记录传送到 Amazon SNS。 | 
| kinesis-analytics-output-ddb | Python 2.7 | 将 Kinesis Data Analytics 应用程序的输出记录传送到 Amazon DynamoDB。 | 

## 使用 Java 创建 Lambda 函数目标
<a name="how-it-works-lambda-dest-java"></a>

要使用 Java 创建目标 Lambda 函数，请使用 [Java 事件](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events)类。

以下代码说明了一个使用 Java 的示例目标 Lambda 函数：

```
public class LambdaFunctionHandler
        implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> {

    @Override
    public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event,
            Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>();
        KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint);
            // Add logic here to transform and send the record to final destination of your choice.
            response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok));
        });
        return response;
    }

}
```

## 使用 .NET 创建 Lambda 函数目标
<a name="how-it-works-lambda-net"></a>

要使用 .NET 创建目标 Lambda 函数，请使用 [.NET 事件](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)类。

以下代码说明了一个使用 C\$1 的示例目标 Lambda 函数：

```
public class Function
    {
        public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsOutputDeliveryResponse
            {
                Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add logic here to send to the record to final destination of your choice.

                var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsOutputDeliveryResponse.OK
                };
                response.Records.Add(deliveredRecord);
            }
            return response;
        }
    }
```

有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息，请参阅[https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)。