

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# CDC 串流入門
<a name="cdc-setup"></a>

**重要**  
此功能以 AWS 預覽版形式提供，可能會有所變更。如需詳細資訊，請參閱 [AWS 服務條款](https://aws.amazon.com/service-terms/)中的第 2 節 Beta 版和預覽版。若要進一步了解 CDC 串流的定價，請參閱 [Aurora DSQL 定價頁面](https://aws.amazon.com/rds/aurora/dsql/pricing/)。  
在一般可用性之前，我們會將新的操作類型 (`"op": "u"` 用於更新） 新增至串流承載。為了確保您的應用程式在不修改的情況下處理這些變更，請套用`after`承載，將任何無法辨識`op`的值視為 upsert。如需詳細資訊，請參閱 [了解 CDC 記錄](cdc-record-format.md)。

本指南會逐步引導您開始將遞交的資料列層級變更從 Aurora DSQL 叢集串流到 Amazon Kinesis 資料串流所需的每個步驟。在本指南的最後，您已建立運作中的 CDC 管道和 Python 指令碼，以讀取和列印變更記錄。

## 先決條件
<a name="cdc-prerequisites"></a>

開始之前，請確認下列事項：
+ 您已建立處於 `ACTIVE` 狀態的 Aurora DSQL 叢集。如果您的叢集處於閒置狀態，請使用任何 PostgreSQL 相容用戶端連線到它，以便在建立 CDC 串流之前將其喚醒。如果叢集未處於 `ACTIVE` 狀態， 會`CreateStream`傳回驗證錯誤。
+ Aurora DSQL 要求叢集、Amazon Kinesis 資料串流、IAM 服務角色和呼叫委託人的所有 CDC 資源都位於同一個 AWS 帳戶中。
+ 您的 Amazon Kinesis 資料串流與 Aurora DSQL 叢集位於相同的 AWS 區域。
+ 您已 AWS CLI 使用具有建立 IAM 角色和 Amazon Kinesis 資料串流許可的登入資料來安裝和設定 。

## 步驟 1：建立 Amazon Kinesis 資料串流
<a name="cdc-step1-kinesis"></a>

在與 Aurora DSQL 叢集相同的 AWS 帳戶和區域中建立 Kinesis 資料串流。CDC 記錄大於對應的 Aurora DSQL 資料列，因為 JSON 格式包含資料欄名稱、中繼資料和編碼額外負荷。

### 調整 Kinesis 資料串流的大小
<a name="cdc-sizing"></a>

Aurora DSQL CDC 會在每次變更時提供完整資料列。接觸單一資料欄的更新會產生包含資料列中每個資料欄的記錄。刪除記錄是例外狀況，只包含主索引鍵資料欄。

**估計平均記錄大小**  
測量平均磁碟上資料列大小，以了解 CDC 將產生的磁碟區，並預測過大的記錄。下列查詢會傳回資料表的平均元組大小，以位元組為單位：

```
SELECT avg(pg_column_size(t.*)) FROM {{your_table}} t;
```

CDC 記錄信封會在資料列大小之外新增資料欄名稱、中繼資料和編碼額外負荷。如需確切的記錄格式，請參閱 [記錄承載](cdc-record-format.md#cdc-record-payload)。如需 Aurora DSQL 如何處理超過 Kinesis 記錄大小限制的記錄，請參閱 [處理過大的記錄](cdc-record-format.md#cdc-oversized-records)。如需完整的 Kinesis 服務限制，請參閱[《Amazon Kinesis Data Streams 開發人員指南》中的 Amazon Kinesis Data Streams 配額和限制](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)。 *Amazon Kinesis *

**重要**  
當您建立 Kinesis 資料串流時，請設定下列項目：  
`MaxRecordSizeInKiB` 至 `10240`(10 MiB)。預設 Kinesis 上限為 1 MiB，並不一定足以用於 Aurora DSQL CDC 記錄。超過所設定 Kinesis 記錄大小的任何記錄都會導致 CDC 串流因 而受損`KINESIS_OVERSIZE_RECORD`。Aurora DSQL 會將過大的記錄分割成各可接近 10 MiB 的片段，因此 Kinesis 資料串流需要接受該大小的記錄。如需詳細資訊，請參閱[處理過大的記錄](cdc-record-format.md#cdc-oversized-records)。
`StreamMode` 至 `ON_DEMAND`。隨需模式會自動擴展碎片容量，並保護您在意外尖峰期間免於佈建不足。當容量擴展時，Kinesis 仍然可以在急劇的秒規模暴增`WriteProvisionedThroughputExceeded`期間傳回。規劃短暫限流事件。

在 `AWS/Kinesis` 命名空間的 `IncomingBytes`和 `WriteProvisionedThroughputExceeded` 上建立 CloudWatch 警示。Kinesis 調節會減緩 CDC 交付並增加複寫延遲。如需 Aurora DSQL 端指標和警示指引，請參閱 [監控最佳實務](cdc-monitoring.md#cdc-monitoring-best-practices)。

下列為使用 AWS CLI的範例。如果您的 AWS CLI 版本不支援 `--max-record-size-in-ki-b` 參數，請使用 AWS SDK 呼叫 Kinesis [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html) 操作。

```
aws kinesis create-stream \
  --stream-name {{my-cdc-stream}} \
  --stream-mode-details StreamMode=ON_DEMAND \
  --max-record-size-in-ki-b 10240 \
  --region {{region}}
```

等待串流變成作用中：

```
aws kinesis describe-stream-summary \
  --stream-name {{my-cdc-stream}} \
  --region {{region}} \
  --query 'StreamDescriptionSummary.StreamStatus'
```

串流就緒`"ACTIVE"`時，命令會傳回 。

從輸出記錄串流 ARN。您需要在下列步驟中使用它。ARN 的格式為 `arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{my-cdc-stream}}`。

## 步驟 2：建立 Aurora DSQL 的 IAM 角色
<a name="cdc-step2-iam"></a>

Aurora DSQL 會擔任 IAM 角色，將 CDC 記錄寫入 Kinesis 資料串流。在此步驟中，您會使用信任政策建立角色，並連接許可政策。如需每個政策元素的完整說明，請參閱 [設定 IAM](cdc-iam.md)。

**建立信任政策檔案**  
將下列 JSON 儲存為 `trust-policy.json`。將 {{your-account-id}}、{{region}} 和 {{cluster-id}} 取代為您的值。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "DSQLAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "dsql.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "{{your-account-id}}"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:dsql:{{region}}:{{your-account-id}}:cluster/{{cluster-id}}/stream/*"
                }
            }
        }
    ]
}
```

**建立角色**  
執行下列 命令以建立 IAM 角色。

```
aws iam create-role \
  --role-name {{dsql-cdc-role}} \
  --assume-role-policy-document file://trust-policy.json
```

**建立許可政策檔案**  
將下列 JSON 儲存為 `permissions-policy.json`。將預留位置值取代為您的 Kinesis 資料串流 ARN。只有當您的 Kinesis 資料串流使用 AWS KMS 客戶受管金鑰時，才需要 `KMSAccess`陳述式，但您可以先包含它，以便稍後新增客戶受管金鑰不會中斷您的 CDC 串流。如需每個條件的完整說明，請參閱 [服務角色許可政策](cdc-iam.md#cdc-iam-permissions-policy)。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisAccess",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}"
        },
        {
            "Sid": "KMSAccess",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": "arn:aws:kms:*:*:key/*",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.{{region}}.amazonaws.com",
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}",
                    "aws:ResourceAccount": "${aws:PrincipalAccount}"
                }
            }
        }
    ]
}
```

**連接許可政策**  
執行以下命令：

```
aws iam put-role-policy \
  --role-name {{dsql-cdc-role}} \
  --policy-name dsql-cdc-kinesis-access \
  --policy-document file://permissions-policy.json
```

從`create-role`輸出記錄角色 ARN。ARN 的格式為 `arn:aws:iam::{{your-account-id}}:role/{{dsql-cdc-role}}`。

## 步驟 3：建立 CDC 串流
<a name="cdc-step3-create-stream"></a>

使用 AWS CLI 建立 CDC 串流，將您的 Aurora DSQL 叢集連線至 Kinesis 資料串流。將預留位置值取代為步驟 1 的 Kinesis 串流 ARN、步驟 2 的 IAM 角色 ARN，以及您的叢集識別符。

```
aws dsql create-stream \
  --cluster-identifier {{cluster-id}} \
  --target-definition '{"kinesis":{"streamArn":"{{kinesis-stream-arn}}","roleArn":"{{role-arn}}"}}' \
  --ordering UNORDERED \
  --format JSON \
  --tags '{"Name":"{{my-cdc-stream}}"}' \
  --region {{region}}
```

回應包含串流識別符和 狀態`CREATING`。串流建立通常需要一到三分鐘。

**等待串流變成作用中**  
輪詢串流狀態，直到達到 `ACTIVE`：

```
aws dsql get-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}} \
  --query 'status'
```

您也可以使用 AWS SDKs 中的`StreamActive`等待程式自動輪詢。

串流到達 後`ACTIVE`，Aurora DSQL 會開始將遞交的資料列層級變更交付至 Kinesis 資料串流。

**注意**  
每個 Aurora DSQL 叢集都有最大數量的 CDC 串流。如果您達到此限制， `CreateStream`會傳回 `ServiceQuotaExceededException`。如需預設限制，請參閱[配額和限制](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/CHAP_quotas.html)。

## 步驟 4：確認記錄正在流動
<a name="cdc-step4-verify"></a>

將資料列插入 Aurora DSQL 叢集上的資料表。例如：

```
CREATE TABLE IF NOT EXISTS test_cdc (
    id INT PRIMARY KEY,
    message TEXT
);

INSERT INTO test_cdc VALUES (1, 'hello cdc');
```

從 Kinesis 資料串流讀取，以確認 CDC 記錄已送達：

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
  --stream-name {{my-cdc-stream}} \
  --shard-id shardId-000000000000 \
  --shard-iterator-type TRIM_HORIZON \
  --region {{region}} \
  --query 'ShardIterator' --output text)

aws kinesis get-records \
  --shard-iterator "$SHARD_ITERATOR" \
  --region {{region}}
```

每個記錄`Data`的欄位都包含 JSON 承載。當您使用 時 AWS CLI，承載會在回應中以 Base64-encoded。當您使用 `boto3` SDK 時，開發套件會自動解碼它。解碼的 JSON 如下所示：

```
{
    "type": "full",
    "op": "c",
    "before": null,
    "after": {"id": 1, "message": "hello cdc"},
    "source": {
        "version": "1.0",
        "ts_ms": 1705318200000,
        "ts_ns": 1705318200000000000,
        "txId": "ffthunp5stx6ffs2vyfqoatmfu",
        "schema": "public",
        "table": "test_cdc",
        "db": "postgres",
        "cluster": "{{cluster-id}}"
    },
    "ts_ms": 1705318200125,
    "ts_ns": 1705318200125483291
}
```

如需每個欄位的完整說明，請參閱 [了解 CDC 記錄](cdc-record-format.md)。

## 步驟 5：使用 Python 指令碼來使用記錄
<a name="cdc-step5-consume"></a>

下列 Python 指令碼會從 Kinesis 資料串流讀取 CDC 記錄，並列印每個變更事件。指令碼使用 `boto3` Amazon Kinesis 用戶端逐一查看碎片並解碼每個記錄。由於 Aurora DSQL CDC at-least-once交付，因此指令碼可能會多次列印相同的記錄。

```
"""
Read CDC records from an Amazon Kinesis data stream.

Usage:
    pip install boto3
    python consume_cdc.py --stream-name my-cdc-stream --region us-east-1
"""
from __future__ import annotations

import argparse
import json

import boto3


def consume_cdc(stream_name: str, region: str) -> None:
    kinesis = boto3.client("kinesis", region_name=region)

    # List all shards (paginate if the stream has many shards)
    shard_ids: list[str] = []
    paginator = kinesis.get_paginator("list_shards")
    for page in paginator.paginate(StreamName=stream_name):
        shard_ids.extend(s["ShardId"] for s in page["Shards"])
    print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))")

    for shard_id in shard_ids:
        iterator_response = kinesis.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType="TRIM_HORIZON",
        )
        shard_iterator = iterator_response["ShardIterator"]

        while shard_iterator:
            records_response = kinesis.get_records(
                ShardIterator=shard_iterator, Limit=100
            )
            shard_iterator = records_response.get("NextShardIterator")

            for record in records_response["Records"]:
                # boto3 decodes Base64 automatically; record["Data"] is bytes.
                payload = json.loads(record["Data"])

                # A record's "type" field identifies its structure.
                # "full": inlined record with before/after values.
                # "chunked": main record that references fragments for a split image.
                # "fragment": one piece of a chunked image; reassemble in production code.
                # For details, see cdc-record-format.html#cdc-oversized-records.
                record_type = payload.get("type", "full")
                if record_type == "fragment":
                    print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}")
                    continue

                source = payload["source"]
                op = payload["op"]
                ts_ns = source["ts_ns"]
                tx_id = source["txId"]
                table = f"{source['schema']}.{source['table']}"

                # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent
                # release will emit "u" for updates, and "c" for inserts. Design your
                # consumer to handle all three values; this map stays correct across the
                # transition.
                op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"}
                print(
                    f"[{op_labels.get(op, op)}] {table} "
                    f"txId={tx_id} ts_ns={ts_ns} type={record_type}"
                )
                if payload.get("after"):
                    print(f"  after:  {json.dumps(payload['after'])}")
                if payload.get("before"):
                    print(f"  before: {json.dumps(payload['before'])}")
                if record_type == "chunked":
                    print(f"  chunked: {json.dumps(payload['chunked'])}")

            if not records_response["Records"]:
                break  # No more records in this shard


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Consume DSQL CDC records from Kinesis"
    )
    parser.add_argument("--stream-name", required=True, help="Kinesis stream name")
    parser.add_argument("--region", required=True, help="AWS Region")
    args = parser.parse_args()
    consume_cdc(args.stream_name, args.region)
```

執行 指令碼：

```
pip install boto3
python consume_cdc.py \
  --stream-name {{my-cdc-stream}} \
  --region {{region}}
```

指令碼會在到達時列印每個變更事件。您會看到類似下列的輸出：

```
Reading from my-cdc-stream (4 shard(s))
[INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full
  after:  {"id": 1, "message": "hello cdc"}
```

**新增last-writer-wins重複資料刪除**  
由於 Aurora DSQL CDC 至少使用at-least-once交付，因此生產應用程式應刪除重複項目和訂單記錄。下列程式碼範例顯示high-water-mark標記方法：對於每個主索引鍵，它會追蹤目前`source.ts_ns`為止看到的最高值，並捨棄具有相同或更早時間戳記的任何記錄。將 `PK_COLUMNS`設定為您正在處理之資料表的主索引鍵資料欄名稱。如需處理多個資料表或刪除的策略，請參閱 [消費者策略](cdc-streams.md#cdc-consumer-strategies)。

```
# Set PK_COLUMNS to the primary key column(s) of your table.
PK_COLUMNS = ["id"]

# Maps each primary key value to the highest ts_ns seen for that key.
high_water: dict[tuple, int] = {}

def process_record(payload: dict) -> bool:
    """Return True if the record is new, False if it's a duplicate or stale.

    Skip fragment records; reassemble them into a full image before calling this.
    """
    if payload.get("type") == "fragment":
        return False  # Fragments are reassembled upstream, not deduplicated here.

    source = payload["source"]
    ts_ns = source["ts_ns"]
    op = payload["op"]

    # For inserts/updates the row is in "after"; for deletes it's in "before".
    row = payload.get("after") or payload.get("before") or {}
    pk = tuple(row.get(col) for col in PK_COLUMNS)

    prev_ts = high_water.get(pk, -1)
    if ts_ns <= prev_ts:
        return False  # Duplicate or out-of-order record

    high_water[pk] = ts_ns
    return True
```

## 管理 CDC 串流
<a name="cdc-manage-streams"></a>

**列出串流**  
若要列出叢集的所有 CDC 串流，請使用 `ListStreams`操作：

```
aws dsql list-streams \
  --cluster-identifier {{cluster-id}} \
  --region {{region}}
```

**刪除串流**  
若要刪除 CDC 串流，請執行下列命令：

```
aws dsql delete-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}}
```

您可以使用`StreamNotExists`等待程式輪詢，`GetStream`直到傳回 `ResourceNotFoundException` 為止，表示 Aurora DSQL 已完全刪除串流。