View a markdown version of this page

C#-Implementierung - AWS Glue

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

C#-Implementierung

Anmerkung

Bevor Sie die folgenden Schritte ausführen, müssen Sie einen aktiven Amazon-MSK- ( Amazon Managed Streaming für Apache Kafka ) oder Apache-Kafka-Cluster haben. Ihre Produzenten und Verbraucher müssen auf .NET 8.0 oder höher laufen.

Installation

Installieren Sie für C#-Anwendungen das AWS Glue Schema SerDe NuGet Registry-Paket mit einer der folgenden Methoden:

.NET-CLI:

Verwenden Sie den folgenden Befehl, um das Paket zu installieren:

dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>

wo <rid> könnte sein1.0.0-linux-x64, 1.0.0-linux-musl-x64 oder 1.0.0-linux-arm64

PackageReference (in deiner.csproj-Datei):

Fügen Sie Ihrer Projektdatei Folgendes hinzu:

<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />

wo <rid> könnte sein1.0.0-linux-x64, 1.0.0-linux-musl-x64 oder 1.0.0-linux-arm64

Konfiguration der Konfigurationsdatei

Erstellen Sie eine Datei mit Konfigurationseigenschaften (z. B.gsr-config.properties) mit den erforderlichen Einstellungen:

Minimale Konfiguration:

Im Folgenden wird ein Beispiel für eine minimale Konfiguration gezeigt:

region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true

Verwenden der C# Glue Schema-Clientbibliothek für Kafka SerDes

Beispiel für die Verwendung des Serializers:

Das folgende Beispiel zeigt, wie der Serializer verwendet wird:

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
Beispiel für die Verwendung des Deserializers:

Das folgende Beispiel zeigt, wie der Deserializer verwendet wird:

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);

Verwenden der C# Glue-Schema-Clientbibliothek mit for KafkaFlow SerDes

Beispiel für die Verwendung des Serializers:

Das folgende Beispiel zeigt, wie Sie KafkaFlow mit dem Serializer konfigurieren:

services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
Beispiel für die Verwendung des Deserializers:

Das folgende Beispiel zeigt, wie die Konfiguration KafkaFlow mit dem Deserializer durchgeführt wird:

.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )

Optionale Producer-Eigenschaften

Sie können Ihre Konfigurationsdatei um zusätzliche optionale Eigenschaften erweitern:

# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB

Unterstützte Datenformate

Sowohl Java- als auch C#-Implementierungen unterstützen dieselben Datenformate:

  • AVRO: Apache Avro-Binärformat

  • JSON: JSON-Schemaformat

  • PROTOBUF: Format für Protokollpuffer

Hinweise