

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 教學課程：搭配 Kinesis Data Streams 使用 Lambda
教學課程

在本教學課程中，您會建立 Lambda 函數，以取用 Amazon Kinesis 資料串流中的事件。

1. 自訂應用程式將記錄寫入串流。

1. AWS Lambda 會輪詢串流，並在偵測到串流中的新記錄時，叫用您的 Lambda 函數。

1. AWS Lambda 會透過擔任您在建立 Lambda 函數時指定的執行角色來執行 Lambda 函數。

## 先決條件


### 安裝 AWS Command Line Interface


如果您尚未安裝 AWS Command Line Interface，請依照[安裝或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)中的步驟進行安裝。

本教學課程需使用命令列終端機或 Shell 來執行命令。在 Linux 和 macOS 中，使用您偏好的 Shell 和套件管理工具。

**注意**  
在 Windows 中，作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 `zip`)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本，請[安裝適用於 Linux 的 Windows 子系統](https://docs.microsoft.com/en-us/windows/wsl/install-win10)。

## 建立執行角色


建立 [執行角色](lambda-intro-execution-role.md)，授予函數存取 AWS 資源的許可。

**若要建立執行角色**

1. 在 IAM 主控台中開啟[角色頁面](https://console.aws.amazon.com/iam/home#/roles)。

1. 選擇建**立角色**。

1. 建立具備下列屬性的角色。
   + **信任實體** – **AWS Lambda**。
   + **Permissions (許可)** - **AWSLambdaKinesisExecutionRole**。
   + **角色名稱** – **lambda-kinesis-role**。

**AWSLambdaKinesisExecutionRole** 政策具備函數自 Kinesis 讀取項目以及寫入日誌到 CloudWatch Logs 時所需的許可。

## 建立函數


建立可處理 Kinesis 訊息的 Lambda 函數。函數程式碼會將 Kinesis 記錄的事件 ID 和事件資料記錄到 CloudWatch Logs 中。

本教學課程使用 Node.js 24 執行時間，但我們也提供其他執行時間語言的範例程式碼。您可以在下列方塊中選取索引標籤，查看您感興趣的執行期程式碼。此步驟要使用的 JavaScript 程式碼，位於 **JavaScript** 索引標籤中顯示的第一個範例。

------
#### [ .NET ]

**適用於 .NET 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 .NET 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 Go 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 Java 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**適用於 JavaScript (v3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
使用 TypeScript 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**適用於 PHP 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 PHP 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 Python 搭配 Lambda 來使用 Kinesis 事件。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 來使用 Kinesis 事件。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**適用於 Rust 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)儲存庫中設定和執行。
使用 Rust 搭配 Lambda 來使用 Kinesis 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**建立函數**

1. 建立專案的目錄，然後切換至該目錄。

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. 將範例 JavaScript 程式碼複製到名為 `index.js` 的新檔案。

1. 建立部署套件。

   ```
   zip function.zip index.js
   ```

1. 使用 `create-function` 命令建立一個 Lambda 函數。

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## 測試 Lambda 函數


使用 `invoke` AWS Lambda CLI 命令和範例 Kinesis 事件手動叫用 Lambda 函數。

**測試 Lambda 函數**

1. 將以下 JSON 複製到一個檔案中並儲存為 `input.txt`。

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. 使用 `invoke` 命令來傳送事件到函數。

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   如果您使用的是第 2 AWS CLI 版，則需要 **cli-binary-format**選項。若要讓此成為預設的設定，請執行 `aws configure set cli-binary-format raw-in-base64-out`。若要取得更多資訊，請參閱*《AWS Command Line Interface 使用者指南第 2 版》*中 [AWS CLI 支援的全域命令列選項](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

   回應已儲存至 `out.txt`。

## 建立 Kinesis 串流


使用 `create-stream ` 命令來建立串流。

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

執行下列 `describe-stream` 命令以取得串流 ARN。

```
aws kinesis describe-stream --stream-name lambda-stream
```

您應該會看到下列輸出：

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

在下一個步驟中，您會使用串流 ARN 與您的 Lambda 函數的串流建立關聯。

## 在 中新增事件來源 AWS Lambda


執行下列 AWS CLI `add-event-source` 命令。

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

記下映射 ID 以供後續使用。您可以執行 `list-event-source-mappings` 命令來取得事件來源映射的清單。

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

在回應中，您可以確認狀態值為 `enabled`。您可以停用事件來源映射來暫時暫停輪詢，而不會遺失任何記錄。

## 測試設定


若要測試事件來源映射，請新增事件記錄到您的 Kinesis 串流。`--data` 值是一個字串，CLI 會先將該字串編碼為 base64，再將它傳送到 Kinesis。您可以執行相同命令一次以上，以新增多筆記錄到串流中。

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda 使用執行角色自串流讀取記錄。接著，它調用您的 Lambda 函數，以記錄批次來傳遞。函數會解碼來自每個記錄的資料並加以記錄，將輸出傳送至 CloudWatch Logs。在 [CloudWatch 主控台](https://console.aws.amazon.com/cloudwatch)中檢視日誌。

## 清除您的資源


除非您想要保留為此教學課程建立的資源，否則您現在便可刪除。透過刪除您不再使用 AWS 的資源，您可以避免不必要的 費用 AWS 帳戶。

**刪除執行角色**

1. 開啟 IAM 主控台中的 [角色頁面](https://console.aws.amazon.com/iam/home#/roles) 。

1. 選取您建立的執行角色。

1. 選擇**刪除**。

1. 在文字輸入欄位中輸入角色的名稱，然後選擇 **刪除** 。

**若要刪除 Lambda 函數**

1. 開啟 Lambda 主控台中的 [函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇您建立的函數。

1. 選擇 **Actions** (動作)、**Delete** (刪除)。

1. 在文字輸入欄位中輸入 **confirm**，然後選擇 **刪除** 。

**刪除 Kinesis 串流**

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 選取您建立的串流。

1. 選擇 **動作**、**刪除**。

1. 在文字輸入欄位中輸入 **delete**。

1. 選擇 **刪除** 。