

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Erste Schritte mit CDC-Streams
<a name="cdc-setup"></a>

**Wichtig**  
Diese Funktion wird als AWS Vorversion bereitgestellt und kann sich ändern. Weitere Informationen finden Sie in Abschnitt 2, Betas und Vorschauen, in den [AWS Servicebedingungen](https://aws.amazon.com/service-terms/). Weitere Informationen zu den Preisen für CDC-Streams finden Sie auf der [Aurora DSQL-Preisseite](https://aws.amazon.com/rds/aurora/dsql/pricing/).  
Vor der allgemeinen Verfügbarkeit werden wir Ihrer Stream-Payload neue Operationstypen (`"op": "u"`für Updates) hinzufügen. Um sicherzustellen, dass Ihre Anwendung diese Änderungen unverändert verarbeitet, behandeln Sie jeden unbekannten `op` Wert als Fehler, indem Sie die Payload übernehmen. `after` Details dazu finden Sie unter [CDC-Datensätze verstehen](cdc-record-format.md).

Dieser Leitfaden führt Sie durch jeden Schritt, der erforderlich ist, um mit dem Streaming festgeschriebener Änderungen auf Zeilenebene von einem Aurora DSQL-Cluster in einen Amazon Kinesis Kinesis-Datenstream zu beginnen. Am Ende dieses Handbuchs haben Sie eine funktionierende CDC-Pipeline und ein Python-Skript erstellt, das Änderungsdatensätze liest und druckt.

## Voraussetzungen
<a name="cdc-prerequisites"></a>

Bevor Sie beginnen, überprüfen Sie Folgendes:
+ Sie haben einen Aurora DSQL-Cluster im `ACTIVE` Status erstellt. Wenn sich Ihr Cluster im Leerlauf befindet, stellen Sie mit einem beliebigen PostgreSQL-compatible Client eine Verbindung zu ihm her, um ihn aufzuwecken, bevor Sie einen CDC-Stream erstellen. `CreateStream`gibt einen Validierungsfehler zurück, wenn sich der Cluster nicht im `ACTIVE` Status befindet.
+ Aurora DSQL erfordert, dass sich alle CDC-Ressourcen — der Cluster, der Amazon Kinesis-Datenstream, die IAM-Servicerolle und der aufrufende Principal — im selben Konto befinden. AWS 
+ Ihr Amazon Kinesis Kinesis-Datenstream befindet sich in derselben AWS Region wie Ihr Aurora DSQL-Cluster.
+ Sie haben die AWS CLI mit Anmeldeinformationen installiert und konfiguriert, die berechtigt sind, IAM-Rollen und Amazon Kinesis Kinesis-Datenstreams zu erstellen.

## Schritt 1: Erstellen Sie einen Amazon Kinesis Kinesis-Datenstream
<a name="cdc-step1-kinesis"></a>

Erstellen Sie einen Kinesis-Datenstream in demselben AWS Konto und derselben Region wie Ihr Aurora DSQL-Cluster. CDC-Datensätze sind größer als die entsprechende Aurora DSQL-Zeile, da das JSON-Format Spaltennamen, Metadaten und Kodierungsaufwand beinhaltet.

### Dimensionierung des Kinesis-Datenstroms
<a name="cdc-sizing"></a>

Aurora DSQL CDC liefert bei jeder Änderung die vollständige Zeile. Eine Aktualisierung, die sich auf eine einzelne Spalte bezieht, erzeugt einen Datensatz, der alle Spalten in der Zeile enthält. Löschdatensätze sind die Ausnahme — sie enthalten nur die Primärschlüsselspalten.

**Schätzen Sie die durchschnittliche Datensatzgröße**  
Messen Sie die durchschnittliche Zeilengröße auf der Festplatte, um das Volumen zu ermitteln, das CDC produzieren wird, und um zu große Datensätze zu antizipieren. Die folgende Abfrage gibt die durchschnittliche Tupelgröße in Byte für eine Tabelle zurück:

```
SELECT avg(pg_column_size(t.*)) FROM {{your_table}} t;
```

Der CDC-Datensatzumschlag fügt zusätzlich zur Zeilengröße Spaltennamen, Metadaten und Kodierungsaufwand hinzu. Das genaue Datensatzformat finden Sie unter[Nutzlast aufzeichnen](cdc-record-format.md#cdc-record-payload). Informationen darüber, wie Aurora DSQL mit Datensätzen umgeht, die die Kinesis-Datensatzgrößenbeschränkung überschreiten, finden Sie unter. [Umgang mit übergroßen Datensätzen](cdc-record-format.md#cdc-oversized-records) Den vollständigen Satz der Kinesis-Servicebeschränkungen finden Sie unter [Amazon Kinesis Data Streams-Kontingente und -Limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) im *Amazon Kinesis Data Streams* Developer Guide.

**Wichtig**  
Wenn Sie den Kinesis-Datenstream erstellen, stellen Sie Folgendes ein:  
`MaxRecordSizeInKiB`bis `10240` (10 MiB). Das standardmäßige Kinesis-Maximum von 1 MiB ist nicht immer groß genug für Aurora DSQL-CDC-Datensätze. Jeder Datensatz, der die konfigurierte Kinesis-Datensatzgröße überschreitet, führt zu einer Beeinträchtigung des CDC-Streams mit. `KINESIS_OVERSIZE_RECORD` Aurora DSQL teilt übergroße Datensätze in Fragmente auf, die sich jeweils bis zu 10 MiB nähern können, sodass der Kinesis-Datenstrom Datensätze dieser Größe akzeptieren muss. Details hierzu finden Sie unter [Umgang mit übergroßen Datensätzen](cdc-record-format.md#cdc-oversized-records).
`StreamMode`zu`ON_DEMAND`. On-demand Der Modus skaliert die Shard-Kapazität automatisch und schützt Sie vor unzureichender Bereitstellung bei unerwarteten Leistungsspitzen. Kinesis kann immer noch `WriteProvisionedThroughputExceeded` bei kurzen Ausbrüchen im Sekundenbereich zurückkehren, wenn die Kapazität steigt. Planen Sie kurze Drosselungen ein.

Erstellen Sie CloudWatch Alarme im `IncomingBytes` und `WriteProvisionedThroughputExceeded` im Namespace. `AWS/Kinesis` Kinesis-Drosselung verlangsamt die CDC-Übertragung und erhöht die Replikationsverzögerung. DSQL-sideAurora-Metriken und Hinweise zu Alarmen finden Sie unter[Überwachung bewährter Verfahren](cdc-monitoring.md#cdc-monitoring-best-practices).

Das folgende Beispiel verwendet die AWS CLI. Wenn Ihre AWS CLI Version den `--max-record-size-in-ki-b` Parameter nicht unterstützt, verwenden Sie ein AWS SDK, um die [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)Kinesis-Operation aufzurufen.

```
aws kinesis create-stream \
  --stream-name {{my-cdc-stream}} \
  --stream-mode-details StreamMode=ON_DEMAND \
  --max-record-size-in-ki-b 10240 \
  --region {{region}}
```

Warten Sie, bis der Stream aktiv wird:

```
aws kinesis describe-stream-summary \
  --stream-name {{my-cdc-stream}} \
  --region {{region}} \
  --query 'StreamDescriptionSummary.StreamStatus'
```

Der Befehl kehrt zurück`"ACTIVE"`, wenn der Stream bereit ist.

Nehmen Sie den Stream-ARN von der Ausgabe auf. Sie benötigen ihn in den folgenden Schritten. Der ARN hat das Format`arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{my-cdc-stream}}`.

## Schritt 2: Erstellen Sie eine IAM-Rolle für Aurora DSQL
<a name="cdc-step2-iam"></a>

Aurora DSQL übernimmt eine IAM-Rolle, um CDC-Datensätze in Ihren Kinesis-Datenstrom zu schreiben. In diesem Schritt erstellen Sie die Rolle mit einer Vertrauensrichtlinie und fügen eine Berechtigungsrichtlinie hinzu. Eine vollständige Erläuterung der einzelnen Richtlinienelemente finden Sie unter[IAM konfigurieren](cdc-iam.md).

**Erstellen Sie die Vertrauensrichtlinien-Datei**  
Speichern Sie den folgenden JSON-Code unter`trust-policy.json`. Ersetzen Sie {{your-account-id}}{{region}}, und {{cluster-id}} durch Ihre Werte.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "DSQLAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "dsql.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "{{your-account-id}}"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:dsql:{{region}}:{{your-account-id}}:cluster/{{cluster-id}}/stream/*"
                }
            }
        }
    ]
}
```

**Erstellen der Rolle**  
Führen Sie den folgenden --Befehl aus, um die IAM-Rolle zu erstellen:

```
aws iam create-role \
  --role-name {{dsql-cdc-role}} \
  --assume-role-policy-document file://trust-policy.json
```

**Erstellen Sie die Datei mit den Berechtigungsrichtlinien**  
Speichern Sie den folgenden JSON-Code unter`permissions-policy.json`. Ersetzen Sie die Platzhalterwerte durch Ihren Kinesis-Datenstream-ARN. Die `KMSAccess` Anweisung ist nur erforderlich, wenn Ihr Kinesis-Datenstream einen vom AWS KMS Kunden verwalteten Schlüssel verwendet. Sie können ihn jedoch präventiv einbeziehen, sodass das Hinzufügen eines vom Kunden verwalteten Schlüssels zu einem späteren Zeitpunkt Ihren CDC-Stream nicht unterbricht. Eine vollständige Erläuterung der einzelnen Bedingungen finden Sie unter. [Richtlinie für Berechtigungen für Servicerollen](cdc-iam.md#cdc-iam-permissions-policy)

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisAccess",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}"
        },
        {
            "Sid": "KMSAccess",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": "arn:aws:kms:*:*:key/*",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.{{region}}.amazonaws.com",
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}",
                    "aws:ResourceAccount": "${aws:PrincipalAccount}"
                }
            }
        }
    ]
}
```

**Hängen Sie die Berechtigungsrichtlinie an**  
Führen Sie den folgenden Befehl aus:

```
aws iam put-role-policy \
  --role-name {{dsql-cdc-role}} \
  --policy-name dsql-cdc-kinesis-access \
  --policy-document file://permissions-policy.json
```

Notieren Sie den Rollen-ARN aus der `create-role` Ausgabe. Der ARN hat das Format`arn:aws:iam::{{your-account-id}}:role/{{dsql-cdc-role}}`.

## Schritt 3: Erstellen Sie den CDC-Stream
<a name="cdc-step3-create-stream"></a>

Verwenden Sie den AWS CLI , um einen CDC-Stream zu erstellen, der Ihren Aurora DSQL-Cluster mit dem Kinesis-Datenstream verbindet. Ersetzen Sie die Platzhalterwerte durch den Kinesis-Stream-ARN aus Schritt 1, den IAM-Rollen-ARN aus Schritt 2 und Ihre Cluster-ID.

```
aws dsql create-stream \
  --cluster-identifier {{cluster-id}} \
  --target-definition '{"kinesis":{"streamArn":"{{kinesis-stream-arn}}","roleArn":"{{role-arn}}"}}' \
  --ordering UNORDERED \
  --format JSON \
  --tags '{"Name":"{{my-cdc-stream}}"}' \
  --region {{region}}
```

Die Antwort enthält eine Stream-ID und den Status. `CREATING` Die Erstellung eines Streams dauert in der Regel ein bis drei Minuten.

**Warten Sie, bis der Stream aktiv wird**  
Fragen Sie den Stream-Status ab, bis er folgende Werte erreicht`ACTIVE`:

```
aws dsql get-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}} \
  --query 'status'
```

Sie können auch den `StreamActive` Kellner in den AWS SDKs verwenden, um automatisch eine Umfrage durchzuführen.

Sobald der Stream erreicht ist`ACTIVE`, beginnt Aurora DSQL mit der Übertragung festgeschriebener Änderungen auf Zeilenebene an Ihrem Kinesis-Datenstream.

**Anmerkung**  
Jeder Aurora DSQL-Cluster hat eine maximale Anzahl von CDC-Streams. Wenn Sie dieses Limit erreichen, wird a `CreateStream` zurückgegeben. `ServiceQuotaExceededException` Informationen zum Standardlimit finden Sie unter [Kontingente und Limits](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/CHAP_quotas.html).

## Schritt 4: Stellen Sie sicher, dass die Datensätze fließen
<a name="cdc-step4-verify"></a>

Fügen Sie eine Zeile in eine Tabelle auf Ihrem Aurora DSQL-Cluster ein. Beispiel:

```
CREATE TABLE IF NOT EXISTS test_cdc (
    id INT PRIMARY KEY,
    message TEXT
);

INSERT INTO test_cdc VALUES (1, 'hello cdc');
```

Lesen Sie aus dem Kinesis-Datenstrom, um zu überprüfen, ob der CDC-Datensatz eingetroffen ist:

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
  --stream-name {{my-cdc-stream}} \
  --shard-id shardId-000000000000 \
  --shard-iterator-type TRIM_HORIZON \
  --region {{region}} \
  --query 'ShardIterator' --output text)

aws kinesis get-records \
  --shard-iterator "$SHARD_ITERATOR" \
  --region {{region}}
```

Das `Data` Feld jedes Datensatzes enthält eine JSON-Nutzlast. Wenn Sie die verwenden AWS CLI, ist die Nutzlast Base64-encoded in der Antwort enthalten. Wenn Sie das `boto3` SDK verwenden, dekodiert das SDK es automatisch. Das dekodierte JSON sieht wie folgt aus:

```
{
    "type": "full",
    "op": "c",
    "before": null,
    "after": {"id": 1, "message": "hello cdc"},
    "source": {
        "version": "1.0",
        "ts_ms": 1705318200000,
        "ts_ns": 1705318200000000000,
        "txId": "ffthunp5stx6ffs2vyfqoatmfu",
        "schema": "public",
        "table": "test_cdc",
        "db": "postgres",
        "cluster": "{{cluster-id}}"
    },
    "ts_ms": 1705318200125,
    "ts_ns": 1705318200125483291
}
```

Eine vollständige Beschreibung der einzelnen Felder finden Sie unter[CDC-Datensätze verstehen](cdc-record-format.md).

## Schritt 5: Datensätze mit einem Python-Skript konsumieren
<a name="cdc-step5-consume"></a>

Das folgende Python-Skript liest CDC-Datensätze aus einem Kinesis-Datenstream und druckt jedes Änderungsereignis. Das Skript verwendet den `boto3` Amazon Kinesis Kinesis-Client, um über Shards zu iterieren und jeden Datensatz zu dekodieren. Da Aurora DSQL CDC mindestens einmal zugestellt wird, druckt das Skript denselben Datensatz möglicherweise mehrmals.

```
"""
Read CDC records from an Amazon Kinesis data stream.

Usage:
    pip install boto3
    python consume_cdc.py --stream-name my-cdc-stream --region us-east-1
"""
from __future__ import annotations

import argparse
import json

import boto3


def consume_cdc(stream_name: str, region: str) -> None:
    kinesis = boto3.client("kinesis", region_name=region)

    # List all shards (paginate if the stream has many shards)
    shard_ids: list[str] = []
    paginator = kinesis.get_paginator("list_shards")
    for page in paginator.paginate(StreamName=stream_name):
        shard_ids.extend(s["ShardId"] for s in page["Shards"])
    print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))")

    for shard_id in shard_ids:
        iterator_response = kinesis.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType="TRIM_HORIZON",
        )
        shard_iterator = iterator_response["ShardIterator"]

        while shard_iterator:
            records_response = kinesis.get_records(
                ShardIterator=shard_iterator, Limit=100
            )
            shard_iterator = records_response.get("NextShardIterator")

            for record in records_response["Records"]:
                # boto3 decodes Base64 automatically; record["Data"] is bytes.
                payload = json.loads(record["Data"])

                # A record's "type" field identifies its structure.
                # "full": inlined record with before/after values.
                # "chunked": main record that references fragments for a split image.
                # "fragment": one piece of a chunked image; reassemble in production code.
                # For details, see cdc-record-format.html#cdc-oversized-records.
                record_type = payload.get("type", "full")
                if record_type == "fragment":
                    print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}")
                    continue

                source = payload["source"]
                op = payload["op"]
                ts_ns = source["ts_ns"]
                tx_id = source["txId"]
                table = f"{source['schema']}.{source['table']}"

                # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent
                # release will emit "u" for updates, and "c" for inserts. Design your
                # consumer to handle all three values; this map stays correct across the
                # transition.
                op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"}
                print(
                    f"[{op_labels.get(op, op)}] {table} "
                    f"txId={tx_id} ts_ns={ts_ns} type={record_type}"
                )
                if payload.get("after"):
                    print(f"  after:  {json.dumps(payload['after'])}")
                if payload.get("before"):
                    print(f"  before: {json.dumps(payload['before'])}")
                if record_type == "chunked":
                    print(f"  chunked: {json.dumps(payload['chunked'])}")

            if not records_response["Records"]:
                break  # No more records in this shard


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Consume DSQL CDC records from Kinesis"
    )
    parser.add_argument("--stream-name", required=True, help="Kinesis stream name")
    parser.add_argument("--region", required=True, help="AWS Region")
    args = parser.parse_args()
    consume_cdc(args.stream_name, args.region)
```

Führen Sie das Skript aus:

```
pip install boto3
python consume_cdc.py \
  --stream-name {{my-cdc-stream}} \
  --region {{region}}
```

Das Skript druckt jedes Änderungsereignis, sobald es eingeht. Die Ausgabe entspricht weitgehend der folgenden:

```
Reading from my-cdc-stream (4 shard(s))
[INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full
  after:  {"id": 1, "message": "hello cdc"}
```

**Last-Writer-Wins-Deduplizierung hinzufügen**  
Da Aurora DSQL CDC mindestens einmal geliefert wird, sollten Produktions-Apps Datensätze deduplizieren und ordnen. Das folgende Codebeispiel zeigt einen Ansatz mit hohen Wasserwerten: Für jeden Primärschlüssel wird der höchste Wert erfasst, der bisher `source.ts_ns` gesehen wurde, und alle Datensätze mit einem gleichen oder einem früheren Zeitstempel werden verworfen. Geben Sie `PK_COLUMNS` die Primärschlüsselspaltennamen der Tabelle ein, die Sie verarbeiten. Strategien, die mehrere Tabellen oder Löschungen verarbeiten, finden Sie unter[Strategien für Verbraucher](cdc-streams.md#cdc-consumer-strategies).

```
# Set PK_COLUMNS to the primary key column(s) of your table.
PK_COLUMNS = ["id"]

# Maps each primary key value to the highest ts_ns seen for that key.
high_water: dict[tuple, int] = {}

def process_record(payload: dict) -> bool:
    """Return True if the record is new, False if it's a duplicate or stale.

    Skip fragment records; reassemble them into a full image before calling this.
    """
    if payload.get("type") == "fragment":
        return False  # Fragments are reassembled upstream, not deduplicated here.

    source = payload["source"]
    ts_ns = source["ts_ns"]
    op = payload["op"]

    # For inserts/updates the row is in "after"; for deletes it's in "before".
    row = payload.get("after") or payload.get("before") or {}
    pk = tuple(row.get(col) for col in PK_COLUMNS)

    prev_ts = high_water.get(pk, -1)
    if ts_ns <= prev_ts:
        return False  # Duplicate or out-of-order record

    high_water[pk] = ts_ns
    return True
```

## CDC-Streams verwalten
<a name="cdc-manage-streams"></a>

**Streams auflisten**  
Verwenden Sie den folgenden `ListStreams` Vorgang, um alle CDC-Streams für einen Cluster aufzulisten:

```
aws dsql list-streams \
  --cluster-identifier {{cluster-id}} \
  --region {{region}}
```

**Löschen eines Streams**  
Führen Sie den folgenden Befehl aus, um einen CDC-Stream zu löschen:

```
aws dsql delete-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}}
```

Sie können den `StreamNotExists` Kellner für die Abfrage verwenden, `GetStream` bis a zurückgegeben `ResourceNotFoundException` wird, was darauf hinweist, dass Aurora DSQL den Stream vollständig gelöscht hat.