Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Commencer à utiliser les flux CDC
Important
Cette fonctionnalité est fournie sous forme d' AWS aperçu et est sujette à modification. Pour plus d'informations, consultez la section 2, Bêtas et aperçus, des conditions de AWS service
Avant la mise à disposition générale, nous ajouterons de nouveaux types d'opérations ("op": "u"pour les mises à jour) à votre charge utile de diffusion. Pour vous assurer que votre application gère ces modifications sans modification, considérez toute valeur non reconnue comme une op valeur ajoutée en appliquant la after charge utile. Consultez Comprendre les dossiers du CDC pour plus de détails.
Ce guide explique toutes les étapes nécessaires pour commencer à diffuser les modifications validées au niveau des lignes depuis un cluster Aurora DSQL vers un flux de données Amazon Kinesis. À la fin de ce guide, vous avez créé un pipeline CDC fonctionnel et un script Python qui lit et imprime les enregistrements de modifications.
Conditions préalables
Avant de commencer, vérifiez les points suivants :
-
Vous avez créé un cluster Aurora DSQL en
ACTIVEétat. Si votre cluster est inactif, connectez-vous à celui-ci avec n'importe quel PostgreSQL-compatible client pour le réveiller avant de créer un flux CDC.CreateStreamrenvoie une erreur de validation si le cluster n'est pas enACTIVEétat. -
Aurora DSQL nécessite que toutes les ressources CDC (cluster, flux de données Amazon Kinesis, rôle de service IAM et principal d'appel) se trouvent dans le même compte. AWS
-
Votre flux de données Amazon Kinesis se trouve dans la même AWS région que votre cluster SQL Aurora.
-
Vous avez installé et configuré le AWS CLI avec des informations d'identification autorisées à créer des rôles IAM et des flux de données Amazon Kinesis.
Étape 1 : créer un flux de données Amazon Kinesis
Créez un flux de données Kinesis dans le même AWS compte et dans la même région que votre cluster Aurora DSQL. Les enregistrements CDC sont plus volumineux que la ligne Aurora DSQL correspondante car le format JSON inclut les noms de colonnes, les métadonnées et la surcharge de codage.
Dimensionnement du flux de données Kinesis
Aurora DSQL CDC fournit la ligne complète pour chaque modification. Une mise à jour qui touche une seule colonne produit un enregistrement contenant toutes les colonnes de la ligne. Les enregistrements de suppression constituent une exception : ils incluent uniquement les colonnes de clé primaire.
Estimation de la taille moyenne des enregistrements
Mesurez la taille moyenne des lignes sur disque pour comprendre le volume que le CDC produira et pour anticiper les enregistrements surdimensionnés. La requête suivante renvoie la taille moyenne des tuples en octets pour une table :
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
L'enveloppe d'enregistrement du CDC ajoute des noms de colonnes, des métadonnées et une surcharge de codage en plus de la taille de la ligne. Pour le format d'enregistrement exact, voirCharge utile record. Pour savoir comment Aurora DSQL gère les enregistrements qui dépassent la limite de taille d'enregistrement Kinesis, consultez. Gestion de dossiers surdimensionnés Pour connaître l'ensemble des limites du service Kinesis, consultez les quotas et limites d'Amazon Kinesis Data Streams dans le manuel du développeur Amazon Kinesis Data Streams.
Important
Lorsque vous créez le flux de données Kinesis, définissez les paramètres suivants :
-
MaxRecordSizeInKiBjusqu'à10240(10 MiB). Le maximum Kinesis par défaut de 1 MiB n'est pas toujours suffisant pour les enregistrements CDC Aurora DSQL. Tout enregistrement qui dépasse la taille d'enregistrement Kinesis configurée altère le flux CDC.KINESIS_OVERSIZE_RECORDAurora DSQL divise les enregistrements surdimensionnés en fragments pouvant atteindre 10 Mo chacun. Le flux de données Kinesis doit donc accepter des enregistrements de cette taille. Pour en savoir plus, consultez Gestion de dossiers surdimensionnés. -
StreamModeàON_DEMAND. On-demand Le mode permet d'ajuster automatiquement la capacité des partitions et de vous protéger contre le sous-provisionnement en cas de pics inattendus. Kinesis peut toujours réapparaîtreWriteProvisionedThroughputExceededlors de fortes rafales de quelques secondes à mesure que la capacité augmente. Prévoyez de brefs événements d'étranglement.
Créez des CloudWatch alarmes sur IncomingBytes et WriteProvisionedThroughputExceeded dans l'espace de AWS/Kinesis noms. La régulation Kinesis ralentit la diffusion des CDC et augmente le délai de réplication. Pour les DSQL-side métriques d'Aurora et les instructions relatives aux alarmes, voirSurveillance des meilleures pratiques.
L'exemple suivant repose sur AWS CLI. Si votre AWS CLI version ne prend pas en charge le --max-record-size-in-ki-b paramètre, utilisez un AWS SDK pour appeler l'opération CreateStreamKinesis.
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
Attendez que le stream soit actif :
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
La commande revient "ACTIVE" lorsque le flux est prêt.
Enregistrez l'ARN du flux à partir de la sortie. Vous en aurez besoin dans les étapes suivantes. L'ARN a le formatarn:aws:kinesis:.region:account-id:stream/my-cdc-stream
Étape 2 : créer un rôle IAM pour Aurora DSQL
Aurora DSQL assume un rôle IAM pour écrire des enregistrements CDC dans votre flux de données Kinesis. Au cours de cette étape, vous créez le rôle avec une politique de confiance et vous y associez une politique d'autorisations. Pour une explication complète de chaque élément de politique, voirConfiguration d'IAM.
Création du fichier de politique de confiance
Enregistrez le JSON suivant sous le nom detrust-policy.json. Remplacez your-account-idregion, et cluster-id par vos valeurs.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Création du rôle
Exécutez la commande suivante pour créer le rôle IAM :
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
Création du fichier de politique d'autorisations
Enregistrez le JSON suivant sous le nom depermissions-policy.json. Remplacez les valeurs de l'espace réservé par l'ARN de votre flux de données Kinesis. La KMSAccess déclaration n'est requise que si votre flux de données Kinesis utilise une clé gérée par le AWS KMS client, mais vous pouvez l'inclure de manière préventive afin que l'ajout ultérieur d'une clé gérée par le client n'interrompe pas votre flux CDC. Pour une explication complète de chaque condition, voirPolitique d'autorisation des rôles de service.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Joindre la politique d'autorisation
Exécutez la commande suivante :
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
Enregistrez l'ARN du rôle à partir de la create-role sortie. L'ARN a le formatarn:aws:iam::.your-account-id:role/dsql-cdc-role
Étape 3 : Création du flux CDC
Utilisez le AWS CLI pour créer un flux CDC qui connecte votre cluster Aurora DSQL au flux de données Kinesis. Remplacez les valeurs d'espace réservé par l'ARN du flux Kinesis de l'étape 1, l'ARN du rôle IAM de l'étape 2 et l'identifiant de votre cluster.
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
La réponse inclut un identifiant de flux et un statut deCREATING. La création d'un stream prend généralement une à trois minutes.
Attendez que le stream soit actif
Vérifiez l'état du stream jusqu'à ce qu'il atteigne ACTIVE :
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
Vous pouvez également utiliser le StreamActive serveur dans les AWS SDK pour effectuer des sondages automatiquement.
Une fois le flux atteintACTIVE, Aurora DSQL commence à apporter des modifications validées au niveau des lignes à votre flux de données Kinesis.
Note
Chaque cluster Aurora DSQL possède un nombre maximum de flux CDC. Si vous atteignez cette limite, CreateStream renvoie unServiceQuotaExceededException. Pour connaître la limite par défaut, voir Quotas et limites.
Étape 4 : vérifier que les enregistrements circulent
Insérez une ligne dans une table de votre cluster Aurora DSQL. Par exemple :
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
Lisez le flux de données Kinesis pour vérifier que l'enregistrement du CDC est bien arrivé :
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
Le Data champ de chaque enregistrement contient une charge utile JSON. Lorsque vous utilisez le AWS CLI, la charge utile se trouve Base64-encoded dans la réponse. Lorsque vous utilisez le boto3 SDK, celui-ci le décode automatiquement. Le JSON décodé ressemble à ce qui suit :
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
Pour une description complète de chaque champ, voirComprendre les dossiers du CDC.
Étape 5 : Consommez des enregistrements avec un script Python
Le script Python suivant lit les enregistrements CDC à partir d'un flux de données Kinesis et imprime chaque événement de modification. Le script utilise le client boto3 Amazon Kinesis pour itérer sur des fragments et décoder chaque enregistrement. Comme Aurora DSQL CDC utilise la livraison au moins une fois, le script peut imprimer le même enregistrement plusieurs fois.
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
Exécutez le script :
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
Le script imprime chaque événement de modification au fur et à mesure qu'il arrive. Vous voyez des résultats similaires à ce qui suit :
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
Ajouter la déduplication last-writer-wins
Comme Aurora DSQL CDC utilise une livraison au moins une fois, les applications de production doivent dédupliquer et commander les enregistrements. L'exemple de code suivant illustre une approche à point culminant : pour chaque clé primaire, elle suit la valeur la plus élevée source.ts_ns observée jusqu'à présent et supprime tout enregistrement dont l'horodatage est égal ou antérieur. Définissez PK_COLUMNS les noms des colonnes clés primaires de la table que vous traitez. Pour les stratégies qui gèrent plusieurs tables ou suppressions, voirStratégies pour les consommateurs.
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
Gestion des flux CDC
Lister les flux
Pour répertorier tous les flux CDC d'un cluster, utilisez l'ListStreamsopération suivante :
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
Supprimer un stream
Pour supprimer un flux CDC, exécutez la commande suivante :
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
Vous pouvez utiliser le StreamNotExists serveur pour interroger GetStream jusqu'à ce que un ResourceNotFoundException soit renvoyé, indiquant qu'Aurora DSQL a complètement supprimé le flux.