View a markdown version of this page

Implémentation en C# - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Implémentation en C#

Note

Prérequis : avant d'effectuer les étapes suivantes, vous devez avoir un Amazon Managed Streaming pour Apache Kafka (Amazon MSK) ou un cluster Apache Kafka en cours d'exécution. Vos producteurs et consommateurs doivent utiliser .NET 8.0 ou une version ultérieure.

Installation

Pour les applications C#, installez le SerDe NuGet package AWS Glue Schema Registry en utilisant l'une des méthodes suivantes :

CLI .NET :

Utilisez la commande suivante pour installer le package :

dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>

<rid> cela pourrait être1.0.0-linux-x64, 1.0.0-linux-musl-x64 ou 1.0.0-linux-arm64

PackageReference (dans votre fichier .csproj) :

Ajoutez ce qui suit à votre fichier de projet :

<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />

<rid> cela pourrait être1.0.0-linux-x64, 1.0.0-linux-musl-x64 ou 1.0.0-linux-arm64

Configuration du fichier de configuration

Créez un fichier de propriétés de configuration (par exemple,gsr-config.properties) avec les paramètres requis :

Configuration minimale :

Voici un exemple de configuration minimale :

region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true

Utilisation de la bibliothèque cliente C# Glue Schema pour Kafka SerDes

Exemple d'utilisation du sérialiseur :

L'exemple suivant montre comment utiliser le sérialiseur :

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
Exemple d'utilisation du désérialiseur :

L'exemple suivant montre comment utiliser le désérialiseur :

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);

Utilisation de la bibliothèque cliente C# Glue Schema avec for KafkaFlow SerDes

Exemple d'utilisation du sérialiseur :

L'exemple suivant montre comment configurer KafkaFlow avec le sérialiseur :

services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
Exemple d'utilisation du désérialiseur :

L'exemple suivant montre comment configurer KafkaFlow avec le désérialiseur :

.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )

Propriétés facultatives du producteur

Vous pouvez étendre votre fichier de configuration avec des propriétés facultatives supplémentaires :

# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB

Formats de données pris en charge

Les implémentations Java et C# prennent en charge les mêmes formats de données :

  • AVRO : format binaire Apache Avro

  • JSON : format de schéma JSON

  • PROTOBUF : format des tampons de protocole

Remarques