View a markdown version of this page

CDC 串流入門 - Amazon Aurora DSQL

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

CDC 串流入門

重要

此功能以 AWS 預覽版形式提供,可能會有所變更。如需詳細資訊,請參閱 AWS 服務條款中的第 2 節 Beta 版和預覽版。若要進一步了解 CDC 串流的定價,請參閱 Aurora DSQL 定價頁面

在一般可用性之前,我們會將新的操作類型 ("op": "u" 用於更新) 新增至串流承載。為了確保您的應用程式在不修改的情況下處理這些變更,請套用after承載,將任何無法辨識op的值視為 upsert。如需詳細資訊,請參閱 了解 CDC 記錄

本指南會逐步引導您開始將遞交的資料列層級變更從 Aurora DSQL 叢集串流到 Amazon Kinesis 資料串流所需的每個步驟。在本指南的最後,您已建立運作中的 CDC 管道和 Python 指令碼,以讀取和列印變更記錄。

先決條件

開始之前,請確認下列事項:

  • 您已建立處於 ACTIVE 狀態的 Aurora DSQL 叢集。如果您的叢集處於閒置狀態,請使用任何 PostgreSQL 相容用戶端連線到它,以便在建立 CDC 串流之前將其喚醒。如果叢集未處於 ACTIVE 狀態, 會CreateStream傳回驗證錯誤。

  • Aurora DSQL 要求叢集、Amazon Kinesis 資料串流、IAM 服務角色和呼叫委託人的所有 CDC 資源都位於同一個 AWS 帳戶中。

  • 您的 Amazon Kinesis 資料串流與 Aurora DSQL 叢集位於相同的 AWS 區域。

  • 您已 AWS CLI 使用具有建立 IAM 角色和 Amazon Kinesis 資料串流許可的登入資料來安裝和設定 。

步驟 1:建立 Amazon Kinesis 資料串流

在與 Aurora DSQL 叢集相同的 AWS 帳戶和區域中建立 Kinesis 資料串流。CDC 記錄大於對應的 Aurora DSQL 資料列,因為 JSON 格式包含資料欄名稱、中繼資料和編碼額外負荷。

調整 Kinesis 資料串流的大小

Aurora DSQL CDC 會在每次變更時提供完整資料列。接觸單一資料欄的更新會產生包含資料列中每個資料欄的記錄。刪除記錄是例外狀況,只包含主索引鍵資料欄。

估計平均記錄大小

測量平均磁碟上資料列大小,以了解 CDC 將產生的磁碟區,並預測過大的記錄。下列查詢會傳回資料表的平均元組大小,以位元組為單位:

SELECT avg(pg_column_size(t.*)) FROM your_table t;

CDC 記錄信封會在資料列大小之外新增資料欄名稱、中繼資料和編碼額外負荷。如需確切的記錄格式,請參閱 記錄承載。如需 Aurora DSQL 如何處理超過 Kinesis 記錄大小限制的記錄,請參閱 處理過大的記錄。如需完整的 Kinesis 服務限制,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的 Amazon Kinesis Data Streams 配額和限制Amazon Kinesis

重要

當您建立 Kinesis 資料串流時,請設定下列項目:

  • MaxRecordSizeInKiB10240(10 MiB)。預設 Kinesis 上限為 1 MiB,並不一定足以用於 Aurora DSQL CDC 記錄。超過所設定 Kinesis 記錄大小的任何記錄都會導致 CDC 串流因 而受損KINESIS_OVERSIZE_RECORD。Aurora DSQL 會將過大的記錄分割成各可接近 10 MiB 的片段,因此 Kinesis 資料串流需要接受該大小的記錄。如需詳細資訊,請參閱處理過大的記錄

  • StreamModeON_DEMAND。隨需模式會自動擴展碎片容量,並保護您在意外尖峰期間免於佈建不足。當容量擴展時,Kinesis 仍然可以在急劇的秒規模暴增WriteProvisionedThroughputExceeded期間傳回。規劃短暫限流事件。

AWS/Kinesis 命名空間的 IncomingBytesWriteProvisionedThroughputExceeded 上建立 CloudWatch 警示。Kinesis 調節會減緩 CDC 交付並增加複寫延遲。如需 Aurora DSQL 端指標和警示指引,請參閱 監控最佳實務

下列為使用 AWS CLI的範例。如果您的 AWS CLI 版本不支援 --max-record-size-in-ki-b 參數,請使用 AWS SDK 呼叫 Kinesis CreateStream 操作。

aws kinesis create-stream \ --stream-name my-cdc-stream \ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --region region

等待串流變成作用中:

aws kinesis describe-stream-summary \ --stream-name my-cdc-stream \ --region region \ --query 'StreamDescriptionSummary.StreamStatus'

串流就緒"ACTIVE"時,命令會傳回 。

從輸出記錄串流 ARN。您需要在下列步驟中使用它。ARN 的格式為 arn:aws:kinesis:region:account-id:stream/my-cdc-stream

步驟 2:建立 Aurora DSQL 的 IAM 角色

Aurora DSQL 會擔任 IAM 角色,將 CDC 記錄寫入 Kinesis 資料串流。在此步驟中,您會使用信任政策建立角色,並連接許可政策。如需每個政策元素的完整說明,請參閱 設定 IAM

建立信任政策檔案

將下列 JSON 儲存為 trust-policy.json。將 your-account-idregioncluster-id 取代為您的值。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
建立角色

執行下列 命令以建立 IAM 角色。

aws iam create-role \ --role-name dsql-cdc-role \ --assume-role-policy-document file://trust-policy.json
建立許可政策檔案

將下列 JSON 儲存為 permissions-policy.json。將預留位置值取代為您的 Kinesis 資料串流 ARN。只有當您的 Kinesis 資料串流使用 AWS KMS 客戶受管金鑰時,才需要 KMSAccess陳述式,但您可以先包含它,以便稍後新增客戶受管金鑰不會中斷您的 CDC 串流。如需每個條件的完整說明,請參閱 服務角色許可政策

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
連接許可政策

執行以下命令:

aws iam put-role-policy \ --role-name dsql-cdc-role \ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json

create-role輸出記錄角色 ARN。ARN 的格式為 arn:aws:iam::your-account-id:role/dsql-cdc-role

步驟 3:建立 CDC 串流

使用 AWS CLI 建立 CDC 串流,將您的 Aurora DSQL 叢集連線至 Kinesis 資料串流。將預留位置值取代為步驟 1 的 Kinesis 串流 ARN、步驟 2 的 IAM 角色 ARN,以及您的叢集識別符。

aws dsql create-stream \ --cluster-identifier cluster-id \ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --region region

回應包含串流識別符和 狀態CREATING。串流建立通常需要一到三分鐘。

等待串流變成作用中

輪詢串流狀態,直到達到 ACTIVE

aws dsql get-stream \ --cluster-identifier cluster-id \ --stream-identifier stream-id \ --region region \ --query 'status'

您也可以使用 AWS SDKs 中的StreamActive等待程式自動輪詢。

串流到達 後ACTIVE,Aurora DSQL 會開始將遞交的資料列層級變更交付至 Kinesis 資料串流。

注意

每個 Aurora DSQL 叢集都有最大數量的 CDC 串流。如果您達到此限制, CreateStream會傳回 ServiceQuotaExceededException。如需預設限制,請參閱配額和限制

步驟 4:確認記錄正在流動

將資料列插入 Aurora DSQL 叢集上的資料表。例如:

CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');

從 Kinesis 資料串流讀取,以確認 CDC 記錄已送達:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-name my-cdc-stream \ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --region region \ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --region region

每個記錄Data的欄位都包含 JSON 承載。當您使用 時 AWS CLI,承載會在回應中以 Base64-encoded。當您使用 boto3 SDK 時,開發套件會自動解碼它。解碼的 JSON 如下所示:

{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }

如需每個欄位的完整說明,請參閱 了解 CDC 記錄

步驟 5:使用 Python 指令碼來使用記錄

下列 Python 指令碼會從 Kinesis 資料串流讀取 CDC 記錄,並列印每個變更事件。指令碼使用 boto3 Amazon Kinesis 用戶端逐一查看碎片並解碼每個記錄。由於 Aurora DSQL CDC at-least-once交付,因此指令碼可能會多次列印相同的記錄。

""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)

執行 指令碼:

pip install boto3 python consume_cdc.py \ --stream-name my-cdc-stream \ --region region

指令碼會在到達時列印每個變更事件。您會看到類似下列的輸出:

Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
新增last-writer-wins重複資料刪除

由於 Aurora DSQL CDC 至少使用at-least-once交付,因此生產應用程式應刪除重複項目和訂單記錄。下列程式碼範例顯示high-water-mark標記方法:對於每個主索引鍵,它會追蹤目前source.ts_ns為止看到的最高值,並捨棄具有相同或更早時間戳記的任何記錄。將 PK_COLUMNS設定為您正在處理之資料表的主索引鍵資料欄名稱。如需處理多個資料表或刪除的策略,請參閱 消費者策略

# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True

管理 CDC 串流

列出串流

若要列出叢集的所有 CDC 串流,請使用 ListStreams操作:

aws dsql list-streams \ --cluster-identifier cluster-id \ --region region
刪除串流

若要刪除 CDC 串流,請執行下列命令:

aws dsql delete-stream \ --cluster-identifier cluster-id \ --stream-identifier stream-id \ --region region

您可以使用StreamNotExists等待程式輪詢,GetStream直到傳回 ResourceNotFoundException 為止,表示 Aurora DSQL 已完全刪除串流。