

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon Kinesis 串流作為 EventBridge 管道的來源
<a name="eb-pipes-kinesis"></a>

您可以使用 EventBridge 接管來接收 Kinesis 資料串流中的記錄。然後，您可以選擇性地篩選或增強這些記錄，然後再將它們傳送到可用的目的地之一進行處理。您可以在設定管道時選擇 Kinesis 特定的設定。當將資料傳送至目的地時，EventBridge 管道會維護資料串流中的記錄順序。

Kinesis 資料串流是一組[碎片](https://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html#shard)。每個碎片包含一系列的資料記錄。**取用程式**是處理來自 Kinesis 資料串流的資料的應用程式。您可以將 EventBridge 管道對應至共用輸送量取用程式 (標準迭代程式)，或對應至具有[增強散發](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html)功能的專用輸送量取用程式。

對於標準迭代程式，EventBridge 會使用 HTTP 協定輪詢 Kinesis 串流中的每個碎片以尋找記錄。此函式會與碎片的其他消費者共用碎片讀取輸送量。

若要將延遲降至最低並最大化讀取輸送量，您可以建立具有增強散發功能的資料串流取用者。串流取用者會取得每個碎片的專用連線，其不會影響其他從串流讀取的應用程式。如果您有許多應用程式讀取相同的資料，或者，如果您要重新處理具有大量記錄的串流，則專屬的輸送量很有助益。Kinesis 透過 HTTP/2 推送記錄到 EventBridge。如需關於 Kinesis 資料串流的資訊，請參閱[從 Amazon Kinesis Data Streams 讀取資料](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html)。

**範例事件**

下列範例事件顯示管道接收的資訊。您可以使用此事件來建立和篩選事件模式，或定義輸入轉換。並非所有欄位都可以篩選。如需有關所能篩選欄位的詳細資訊，請參閱 [Amazon EventBridge 管道中的事件篩選](eb-pipes-event-filtering.md)。

```
[
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "approximateArrivalTimestamp": 1545084650.987,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  },
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
    "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
    "approximateArrivalTimestamp": 1545084711.166,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  }
]
```

## 輪詢和批次處理串流
<a name="pipes-ak-polling"></a>

EventBridge 會以每秒一次的基本速率輪詢 Kinesis 串流中的記錄碎片。當記錄可用時，EventBridge 會處理事件，並等待結果。如果處理成功，EventBridge 會恢復輪詢，直到收到多筆記錄。

EventBridge 預設會在記錄可用時立即調用管道。如果 EventBridge 從來源中讀取的批次之中只有一筆記錄，只會處理一筆事件。為避免處理少量記錄，您可讓管道設定批次間隔，要求管道緩衝記錄最長達五分鐘。處理事件之前，EventBridge 會繼續從來源中讀取記錄，直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。

您還可以透過並行處理來自每個碎片的多個批次來增加並行性。EventBridge 可同時處理每個碎片中最多 10 個批次。如果增加每個碎片的並行批次數量，EventBridge 仍會確保在分割區索引鍵層級進行依序處理。

規劃 `ParallelizationFactor` 設定來同時處理 Kinesis 或 DynamoDB 資料串流的一個碎片與多個管道執行。您可以透過從 1 (預設) 到 10 的並行化因子指定 EventBridge 從碎片輪詢的並行批次數。例如，當 `ParallelizationFactor` 設定為 2 時，您最多可以有 200 個並行 EventBridge 管道執行，來處理 100 個 Kinesis 資料碎片。當資料量急劇波動並且 `IteratorAge` 高時，這有助於縱向擴展處理輸送量。請注意，如果您使用 Kinesis 彙總，則並行化因子將不起作用。

## 輪詢和串流開始位置
<a name="pipes-ak-stream-start-position"></a>

請注意，建立和更新管道期間的串流輪詢最終會一致。
+ 在建立管道期間，從串流開始輪詢事件可能需要幾分鐘時間。
+ 在更新管道資源輪訓組態期間，從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這表示如果您指定 `LATEST` 當作串流的開始位置，管道可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件，請將串流開始位置指定為 `TRIM_HORIZON` 或 `AT_TIMESTAMP`。

## 報告批次項目失敗
<a name="pipes-ak-batch-failures"></a>

EventBridge 取用和處理來源的串流資料時，依預設，只有在批次成功完成時，其檢查點才會到批次的最高序號。若要避免重新處理失敗批次中成功處理過的訊息，您可以設定擴充或目標，傳回哪些訊息成功，哪些失敗的物件。我們將其稱為部分批次回應。

如需詳細資訊，請參閱[部分批次失敗](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure)。

### 成功與失敗條件
<a name="pipes-ak-batch-failures-conditions"></a>

如果您傳回下列任一項目，EventBridge 會將批次視為完全成功：
+ 空白 `batchItemFailure` 清單
+ Null `batchItemFailure` 清單
+ 空白 `EventResponse`
+ Null `EventResponse`

如果您傳回下列任一項目，EventBridge 會將批次視為完全失敗：
+ 空白字串 `itemIdentifier`
+ Null `itemIdentifier`
+ 具有錯誤金鑰名稱的 `itemIdentifier`

EventBridge 會根據您的重試策略來重試失敗。