

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 設定 Kafka 事件來源的錯誤處理控制項
<a name="kafka-retry-configurations"></a>

您可以設定 Lambda 如何處理 Kafka 事件來源映射的錯誤和重試。這些組態可協助您控制 Lambda 如何處理失敗的記錄和管理重試行為。

## 可用的重試組態
<a name="kafka-retry-options"></a>

下列重試組態適用於 Amazon MSK 和自我管理 Kafka 事件來源：
+ **重試次數上限** – 當函數傳回錯誤時，Lambda 重試次數上限。這不會計入初始調用嘗試。預設值為 -1 （無限）。當您同時設定無限次重試和[失敗時的目的地](kafka-on-failure-destination.md)時，Lambda 會自動套用最多 10 次重試嘗試。
+ **記錄存留期上限** – Lambda 傳送給函數的記錄存留期上限。預設值為 -1 （無限）。
+ **錯誤時分割批次** – 當您的函數傳回錯誤時，請將批次分割為兩個較小的批次，並個別重試。這有助於隔離有問題的記錄。
+ **部分批次回應** – 允許函數傳回批次中哪些記錄處理失敗的資訊，因此 Lambda 只能重試失敗的記錄。

## 設定錯誤處理控制 （主控台）
<a name="kafka-retry-console"></a>

您可以在 Lambda 主控台中建立或更新 Kafka 事件來源映射時設定重試行為。

**設定 Kafka 事件來源的重試行為 （主控台）**

1. 開啟 Lambda 主控台中的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇您的函數名稱。

1. 執行以下任意一項：
   + 若要新增新的 Kafka 觸發條件，請在**函數概觀**下，選擇**新增觸發條件**。
   + 若要修改現有的 Kafka 觸發條件，請選擇觸發條件，然後選擇**編輯**。

1. 在**事件輪詢器組態**下，選取佈建模式以設定錯誤處理控制項：

   1. 針對**重試嘗試**，輸入重試嘗試次數上限 (0-10000，或無限輸入 -1)。

   1. 針對**記錄存留期上限**，以秒為單位輸入存留期上限 (60-604800，無限輸入 -1)。

   1. 若要在發生錯誤時啟用批次分割，請選取**錯誤時分割批次**。

   1. 若要啟用部分批次回應，請選取 **ReportBatchItemFailures**。

1. 選擇**新增**或**儲存**。

## 設定重試行為 (AWS CLI)
<a name="kafka-retry-cli"></a>

使用下列 AWS CLI 命令來設定 Kafka 事件來源映射的重試行為。

### 使用重試組態建立事件來源映射
<a name="kafka-retry-cli-create"></a>

下列範例會使用錯誤處理控制項建立自我管理的 Kafka 事件來源映射：

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics my-kafka-topic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

對於 Amazon MSK 事件來源：

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSMSKKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

### 更新重試組態
<a name="kafka-retry-cli-update"></a>

使用 `update-event-source-mapping`命令來修改現有事件來源映射的重試組態：

```
aws lambda update-event-source-mapping \
  --uuid 12345678-1234-1234-1234-123456789012 \
  --maximum-retry-attempts 5 \
  --maximum-record-age-in-seconds 7200 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

## PartialBatchResponse
<a name="kafka-partial-batch-response"></a>

部分批次回應也稱為 ReportBatchItemFailures，是 Lambda 與 Kafka 來源整合時處理錯誤的重要功能。如果沒有此功能，當批次中的其中一個項目發生錯誤時，會導致重新處理該批次中的所有訊息。啟用並實作部分批次回應後，處理常式只會傳回失敗訊息的識別符，讓 Lambda 只重試這些特定項目。這可讓您更好地控制包含失敗訊息的批次處理方式。

若要報告批次錯誤，您將使用此 JSON 結構描述：

```
{
  "batchItemFailures": [
    {
      "itemIdentifier": {
        "partition": "topic-partition_number",
        "offset": 100
      }
    },
    ...
  ]
}
```

**重要**  
如果您傳回空的有效 JSON 或 null，事件來源映射會將批次視為已成功處理。傳回的任何無效 topic-partition\_number 或位移若不存在於調用事件中，將視為失敗，並重試整個批次。

下列程式碼範例示範如何為從 Kafka 來源接收事件的 Lambda 函數實作部分批次回應。此函數會在回應中報告批次項目失敗，指示 Lambda 稍後重試這些訊息。

以下是顯示此方法的 Python Lambda 處理常式實作：

```
import base64
from typing import Any, Dict, List

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]:
    failures: List[Dict[str, Dict[str, Any]]] = []
    records_dict = event.get("records", {})
    
    for topic_partition, records_list in records_dict.items():
        for record in records_list:
            topic = record.get("topic")
            partition = record.get("partition")
            offset = record.get("offset")
            value_b64 = record.get("value")
            
            try:
                data = base64.b64decode(value_b64).decode("utf-8")
                process_message(data)
            except Exception as exc:
                print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}")
                item_identifier: Dict[str, Any] = {
                    "partition": f"{topic}-{partition}",
                    "offset": int(offset) if offset is not None else None,
                }
                failures.append({"itemIdentifier": item_identifier})
    
    return {"batchItemFailures": failures}

def process_message(data: str) -> None:
    # Your business logic for a single message
    pass
```

以下是 Node.js 版本：

```
const { Buffer } = require("buffer");

const handler = async (event) => {
  const failures = [];
  
  for (let topicPartition in event.records) {
    const records = event.records[topicPartition];
    
    for (const record of records) {
      const topic = record.topic;
      const partition = record.partition;
      const offset = record.offset;
      const valueBase64 = record.value;
      const data = Buffer.from(valueBase64, "base64").toString("utf8");
      
      try {
        await processMessage(data);
      } catch (error) {
        console.error("Failed to process record", { topic, partition, offset, error });
        const itemIdentifier = {
          "partition": `${topic}-${partition}`,
          "offset": Number(offset),
        };
        failures.push({ itemIdentifier });
      }
    }
  }
  
  return { batchItemFailures: failures };
};

async function processMessage(payload) {
  // Your business logic for a single message
}

module.exports = { handler };
```

以下是 Java 版本：

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KafkaBatchHandler implements RequestHandler<Map<String, Object>, Map<String, Object>> {

    @SuppressWarnings("unchecked")
    @Override
    public Map<String, Object> handleRequest(Map<String, Object> event, Context context) {
        List<Map<String, Object>> failures = new ArrayList<>();
        Map<String, List<Map<String, Object>>> records =
                (Map<String, List<Map<String, Object>>>) event.getOrDefault("records", Map.of());

        for (Map.Entry<String, List<Map<String, Object>>> entry : records.entrySet()) {
            for (Map<String, Object> record : entry.getValue()) {
                String topic = (String) record.get("topic");
                Object partition = record.get("partition");
                Object offset = record.get("offset");
                String valueBase64 = (String) record.get("value");

                try {
                    String data = new String(Base64.getDecoder().decode(valueBase64), "UTF-8");
                    processMessage(data);
                } catch (Exception e) {
                    System.err.printf("Failed to process record topic=%s partition=%s offset=%s: %s%n",
                            topic, partition, offset, e.getMessage());
                    Map<String, Object> itemIdentifier = new HashMap<>();
                    itemIdentifier.put("partition", topic + "-" + partition);
                    itemIdentifier.put("offset", offset instanceof Number ? ((Number) offset).longValue() : null);
                    Map<String, Object> failure = new HashMap<>();
                    failure.put("itemIdentifier", itemIdentifier);
                    failures.add(failure);
                }
            }
        }

        Map<String, Object> response = new HashMap<>();
        response.put("batchItemFailures", failures);
        return response;
    }

    private void processMessage(String data) {
        // Your business logic for a single message
    }
}
```