Implementación de C#
nota
Requisitos previos: antes de completar los siguientes pasos, deberá tener un clúster de Amazon Managed Streaming for Apache Kafka (Amazon MSK) o Apache Kafka en ejecución. Sus productores y consumidores deben ejecutarse en .NET 8.0 o superior.
Instalación
Para las aplicaciones de C#, instale el paquete de NuGet de SerDe de registro de esquema de AWS Glue mediante uno de los siguientes métodos:
.NET CLI:
Para instalar el paquete, use el siguiente comando:
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
donde <rid> podría ser 1.0.0-linux-x64, 1.0.0-linux-musl-x64 o 1.0.0-linux-arm64
PackageReference (en su archivo.csproj):
Añada lo siguiente al archivo del proyecto:
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
donde <rid> podría ser 1.0.0-linux-x64, 1.0.0-linux-musl-x64 o 1.0.0-linux-arm64
Configuración del archivo de configuración
Cree un archivo de propiedades de configuración (por ejemplo, gsr-config.properties) con los parámetros necesarios:
Configuración mínima:
El siguiente ejemplo es una configuración mínima de muestra:
region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true
Uso de la biblioteca de cliente de esquema de Glue de C# para Kafka SerDes
Ejemplo de uso del serializador:
En el siguiente ejemplo se muestra cómo utilizar el serializador:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
Ejemplo de uso del deserializador:
En el siguiente ejemplo se muestra cómo utilizar el deserializador:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
Uso de la biblioteca de cliente de esquema de Glue de C# con KafkaFlow para SerDes
Ejemplo de uso del serializador:
En el siguiente ejemplo se muestra cómo configurar KafkaFlow con el serializador:
services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
Ejemplo de uso del deserializador:
En el siguiente ejemplo se muestra cómo configurar KafkaFlow con el deserializador:
.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )
Propiedades opcionales del productor
Puede ampliar el archivo de configuración con propiedades opcionales adicionales:
# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB
Formatos de fecha admitidos
Las implementaciones de Java y C# admiten los mismos formatos de datos:
AVRO: formato binario JSON Apache Avro.
JSON: formato de esquemas JSON
PROTOBUF: formato de búferes de protocolo
Notas
Para empezar a utilizar la biblioteca, visite https://www.nuget.org/packages/AWS.Glue.SchemaRegistry
El código fuente está disponible en: https://github.com/awslabs/aws-glue-schema-registry