使用 Kafka 主題做為失敗時的目的地 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Kafka 主題做為失敗時的目的地

您可以將 Kafka 主題設定為 Kafka 事件來源映射的失敗時目的地。當 Lambda 在耗盡重試嘗試後或記錄超過最長存留期後無法處理記錄時,Lambda 會將失敗的記錄傳送至指定的 Kafka 主題,以供稍後處理。當您同時設定無限次重試和失敗時的目的地時,Lambda 會自動套用最多 10 次重試嘗試。

Kafka 失敗時目的地的運作方式

當您將 Kafka 主題設定為失敗時的目的地時,Lambda 會充當 Kafka 生產者,並將失敗的記錄寫入目的地主題。這會在 Kafka 基礎設施內建立無效字母主題 (DLT) 模式。

  • 相同的叢集需求 – 目的地主題必須與來源主題存在於相同的 Kafka 叢集中。

  • 實際記錄內容 – Kafka 目的地會收到實際失敗的記錄以及失敗中繼資料。

  • 防止遞迴 – Lambda 透過封鎖來源和目的地主題相同的組態來防止無限迴圈。

設定 Kafka 失敗時的目的地

您可以在建立或更新 Kafka 事件來源映射時,將 Kafka 主題設定為失敗時的目的地。

設定 Kafka 目的地 (主控台)

將 Kafka 主題設定為失敗時的目的地 (主控台)
  1. 開啟 Lambda 主控台中的函數頁面

  2. 選擇您的函數名稱。

  3. 執行以下任意一項:

    • 若要新增新的 Kafka 觸發條件,請在函數概觀下,選擇新增觸發條件

    • 若要修改現有的 Kafka 觸發條件,請選擇觸發條件,然後選擇編輯

  4. 其他設定下,針對失敗目的地,選擇 Kafka 主題

  5. 針對主題名稱,輸入您要傳送失敗記錄的 Kafka 主題名稱。

  6. 選擇新增儲存

設定 Kafka 目的地 (AWS CLI)

使用 kafka://字首將 Kafka 主題指定為失敗時的目的地。

使用 Kafka 目的地建立事件來源映射

下列範例會建立 Amazon MSK 事件來源映射,並以 Kafka 主題做為失敗時的目的地:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

對於自我管理的 Kafka,請使用相同的語法:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

更新 Kafka 目的地

使用 update-event-source-mapping命令來新增或修改 Kafka 目的地:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 目的地的記錄格式

當 Lambda 將失敗的記錄傳送到 Kafka 主題時,每個訊息都會包含有關失敗和實際記錄內容的中繼資料。

失敗中繼資料

中繼資料包含記錄失敗原因的相關資訊,以及原始批次的詳細資訊:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

分割區索引鍵行為

在產生目的地主題時,Lambda 會使用原始記錄中的相同分割區索引鍵。如果原始記錄沒有金鑰,Lambda 會在目的地主題中的所有可用分割區中使用 Kafka 的預設循環配置分割。

要求與限制

  • 需要佈建模式 – Kafka 故障時目的地僅適用於啟用佈建模式的事件來源映射。

  • 僅限相同叢集 – 目的地主題必須與來源主題存在於相同的 Kafka 叢集中。

  • 主題許可 – 您的事件來源映射必須具有目的地主題的寫入許可。範例:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 無遞迴 – 目的地主題名稱不能與來源主題名稱相同。