Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
C#-Implementierung
Anmerkung
Bevor Sie die folgenden Schritte ausführen, müssen Sie einen aktiven Amazon-MSK- ( Amazon Managed Streaming für Apache Kafka ) oder Apache-Kafka-Cluster haben. Ihre Produzenten und Verbraucher müssen auf .NET 8.0 oder höher laufen.
Installation
Installieren Sie für C#-Anwendungen das AWS Glue Schema SerDe NuGet Registry-Paket mit einer der folgenden Methoden:
.NET-CLI:
Verwenden Sie den folgenden Befehl, um das Paket zu installieren:
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
wo <rid> könnte sein1.0.0-linux-x64, 1.0.0-linux-musl-x64 oder 1.0.0-linux-arm64
PackageReference (in deiner.csproj-Datei):
Fügen Sie Ihrer Projektdatei Folgendes hinzu:
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
wo <rid> könnte sein1.0.0-linux-x64, 1.0.0-linux-musl-x64 oder 1.0.0-linux-arm64
Konfiguration der Konfigurationsdatei
Erstellen Sie eine Datei mit Konfigurationseigenschaften (z. B.gsr-config.properties) mit den erforderlichen Einstellungen:
Minimale Konfiguration:
Im Folgenden wird ein Beispiel für eine minimale Konfiguration gezeigt:
region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true
Verwenden der C# Glue Schema-Clientbibliothek für Kafka SerDes
Beispiel für die Verwendung des Serializers:
Das folgende Beispiel zeigt, wie der Serializer verwendet wird:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
Beispiel für die Verwendung des Deserializers:
Das folgende Beispiel zeigt, wie der Deserializer verwendet wird:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
Verwenden der C# Glue-Schema-Clientbibliothek mit for KafkaFlow SerDes
Beispiel für die Verwendung des Serializers:
Das folgende Beispiel zeigt, wie Sie KafkaFlow mit dem Serializer konfigurieren:
services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
Beispiel für die Verwendung des Deserializers:
Das folgende Beispiel zeigt, wie die Konfiguration KafkaFlow mit dem Deserializer durchgeführt wird:
.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )
Optionale Producer-Eigenschaften
Sie können Ihre Konfigurationsdatei um zusätzliche optionale Eigenschaften erweitern:
# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB
Unterstützte Datenformate
Sowohl Java- als auch C#-Implementierungen unterstützen dieselben Datenformate:
AVRO: Apache Avro-Binärformat
JSON: JSON-Schemaformat
PROTOBUF: Format für Protokollpuffer