本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
具有耐用函數的事件來源映射
耐用函數適用於所有 Lambda 事件來源映射。設定耐用函數的事件來源映射,與設定標準函數的方式相同。事件來源映射會自動輪詢事件來源,例如 Amazon SQS、Kinesis 和 DynamoDB Streams,並使用批次記錄叫用函數。
事件來源映射適用於使用複雜多步驟工作流程處理串流或佇列的耐久函數。例如,您可以建立耐久的 函數,透過重試、外部 API 呼叫和人工核准來處理 Amazon SQS 訊息。
事件來源映射如何叫用耐久函數
事件來源映射會同步叫用持久性函數,等待完全持久性執行完成,然後再處理下一個批次或將記錄標記為已處理。如果總耐久執行時間超過 15 分鐘,則執行會逾時並失敗。事件來源映射會收到逾時例外狀況,並根據其重試組態進行處理。
15 分鐘執行限制
當事件來源映射調用持久性函數時,總持久性執行持續時間不能超過 15 分鐘。此限制適用於從開始到完成的整個持久性執行,而不只是個別函數叫用。
此 15 分鐘限制與 Lambda 函數逾時分開 (最長 15 分鐘)。函數逾時控制每個個別調用可以執行的時間長度,而持久的執行逾時控制從執行開始到完成的總經過時間。
範例案例:
-
有效:耐用的函數會處理具有三個步驟的 Amazon SQS 訊息,每個步驟需要 2 分鐘,然後等待 5 分鐘,再完成最後一個步驟。總執行時間:11 分鐘。這是因為總計少於 15 分鐘。
-
無效:耐用函數會處理 Amazon SQS 訊息、在 2 分鐘內完成初始處理,然後等待 20 分鐘進行外部回呼,然後再完成。總執行時間:22 分鐘。這超過 15 分鐘的限制,將會失敗。
-
無效:耐用函數會處理 Kinesis 記錄,其中多個等待操作在步驟之間總計為 30 分鐘。即使每個個別調用都快速完成,總執行時間仍超過 15 分鐘。
使用事件來源映射時,將您的持久執行逾時設定為 15 分鐘或更少,否則建立事件來源映射將會失敗。如果您的工作流程需要較長的執行時間,請使用如下所述的中介函數模式。
設定事件來源映射
使用 Lambda 主控台 AWS CLI或 AWS SDKs 設定耐用函數的事件來源映射。所有標準事件來源映射屬性都適用於耐用函數:
aws lambda create-event-source-mapping \
--function-name arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1 \
--event-source-arn arn:aws:sqs:us-east-1:123456789012:my-queue \
--batch-size 10 \
--maximum-batching-window-in-seconds 5
為耐用函數設定事件來源映射時,請記得使用合格的 ARN (具有版本編號或別名)。
使用事件來源映射處理錯誤
事件來源映射提供內建錯誤處理,可搭配耐用的 函數運作:
-
重試行為:如果初始調用失敗,事件來源映射會根據其重試組態重試。根據您的需求設定重試嘗試次數上限和重試間隔。
-
無效字母佇列:設定無效字母佇列以擷取所有重試後失敗的記錄。這可防止訊息遺失,並啟用失敗記錄的手動檢查。
-
部分批次失敗:對於 Amazon SQS 和 Kinesis,請使用部分批次失敗報告個別處理記錄,並且只重試失敗的記錄。
-
偏差錯誤:對於 Kinesis 和 DynamoDB 串流,啟用偏差錯誤以分割失敗的批次並隔離有問題的記錄。
耐用的函數支援無效字母佇列 (DLQs) 來處理錯誤,但不支援 Lambda 目的地。設定 DLQ 以從失敗的調用中擷取記錄。
如需事件來源映射錯誤處理的完整資訊,請參閱事件來源映射。
如果您的工作流程需要超過 15 分鐘才能完成,請在事件來源映射和耐用函數之間使用中介標準 Lambda 函數。中介函數會從事件來源映射接收事件,並以非同步方式叫用耐久函數,移除 15 分鐘的執行限制。
此模式會將事件來源映射的同步調用模型與耐久函數長時間執行的執行模型分離。事件來源映射會叫用中介函數,該函數會在啟動耐久執行後快速傳回。然後,耐用函數會視需要獨立執行 (最長 1 年)。
中介函數模式使用三個元件:
-
事件來源映射:輪詢事件來源 (Amazon SQS、Kinesis、DynamoDB Streams),並使用批次記錄同步叫用中介函數。
-
中介函數:從事件來源映射接收事件的標準 Lambda 函數,會視需要驗證和轉換資料,並以非同步方式叫用持久函數。此函數會快速完成 (通常不到 1 秒),並將控制項傳回至事件來源映射。
-
耐用函數:使用可長時間執行的複雜多步驟邏輯來處理事件。非同步叫用,因此不受 15 分鐘限制。
中介函數會從事件來源映射接收整個事件,並以非同步方式叫用耐久函數。使用執行名稱參數來確保等冪執行開始,防止事件來源映射重試時重複處理:
- TypeScript
-
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';
import { SQSEvent } from 'aws-lambda';
import { createHash } from 'crypto';
const lambda = new LambdaClient({});
export const handler = async (event: SQSEvent) => {
// Invoke durable function asynchronously with execution name
await lambda.send(new InvokeCommand({
FunctionName: 'arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
InvocationType: 'Event',
Payload: JSON.stringify({
executionName: event.Name,
event: event
})
}));
return { statusCode: 200 };
};
- Python
-
import boto3
import json
import hashlib
lambda_client = boto3.client('lambda')
def handler(event, context):
# Invoke durable function asynchronously with execution name
lambda_client.invoke(
FunctionName='arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
InvocationType='Event',
Payload=json.dumps({
'executionName': execution_name,
'event': event["name"]
})
)
return {'statusCode': 200}
對於中介函數本身的冪等性,如果事件來源映射重試中介函數,請使用適用於 的 Powertools AWS Lambda 來防止重複叫用持久函數。
耐用函數會接收具有執行名稱的承載,並使用長時間執行的邏輯處理所有記錄:
- TypeScript
-
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
export const handler = withDurableExecution(
async (payload: any, context: DurableContext) => {
const sqsEvent = payload.event;
// Process each record with complex, multi-step logic
const results = await context.map(
sqsEvent.Records,
async (ctx, record) => {
const validated = await ctx.step('validate', async () => {
return validateOrder(JSON.parse(record.body));
});
// Wait for external approval (could take hours or days)
const approval = await ctx.waitForCallback(
'approval',
async (callbackId) => {
await requestApproval(callbackId, validated);
},
{ timeout: { hours: 48 } }
);
// Complete processing
return await ctx.step('complete', async () => {
return completeOrder(validated, approval);
});
}
);
return { statusCode: 200, processed: results.getResults().length };
}
);
- Python
-
from aws_durable_execution_sdk_python import durable_execution, DurableContext
from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig
from collections.abc import Sequence
import json
def validate_order(order_data: dict) -> dict:
"""Validate order data - always passes."""
return order_data
def request_approval(callback_id: str, validated_order: dict) -> None:
"""Request approval for the order - always passes."""
pass
def complete_order(validated_order: dict, approval_result: str) -> dict:
"""Complete the order processing - always passes."""
return validated_order
@durable_execution
def lambda_handler(payload, context: DurableContext):
sqs_event = payload['event']
def process_record(
ctx: DurableContext,
record: dict,
index: int,
items: Sequence[dict]
) -> dict:
validated = ctx.step(
lambda _: validate_order(json.loads(record['body'])),
name=f'validate-{index}'
)
approval = ctx.wait_for_callback(
submitter=lambda callback_id, wait_ctx: request_approval(callback_id, validated),
name=f'approval-{index}',
config=WaitForCallbackConfig(timeout=Duration.from_seconds(172800))
)
return ctx.step(
lambda _: complete_order(validated, approval),
name=f'complete-{index}'
)
results = context.map(
inputs=sqs_event['Records'],
func=process_record,
name='process-records'
)
return {
'statusCode': 200,
'started': results.started_count,
'completed': results.success_count,
'failed': results.failure_count,
'total': results.total_count
}
此模式會取消事件來源映射與持久性執行的耦合,以移除 15 分鐘執行限制。中介函數會在啟動耐久執行後立即傳回,允許事件來源映射繼續處理。然後,耐用函數會視需要獨立執行。
中介函數會在叫用持久性函數時成功,而不是在持久性執行完成時成功。如果稍後持久性執行失敗,則事件來源映射不會重試,因為它已成功處理批次。在耐用函數中實作錯誤處理,並為失敗的執行設定無效字母佇列。
使用執行名稱參數來確保等冪執行開始。如果事件來源映射重試中介函數,則耐久函數不會啟動重複的執行,因為執行名稱已存在。
支援的事件來源
耐用函數支援使用事件來源映射的所有 Lambda 事件來源:
呼叫耐用函數時,所有事件來源類型都受限於 15 分鐘的耐久執行限制。