View a markdown version of this page

Implementação de C# - AWS Glue

Implementação de C#

nota

Pré-requisitos: antes de concluir as etapas a seguir, é necessário ter um cluster do Amazon Managed Streaming for Apache Kafka (Amazon MSK) ou do Apache Kafka em execução. Seus produtores e consumidores precisam estar em execução em .NET 8.0 ou superior.

Instalação

Para aplicativos C#, instale o pacote AWS Glue Schema Registry SerDe NuGet usando um dos métodos a seguir:

.NET CLI:

Para instalar o pacote, execute o comando a seguir:

dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>

onde <rid> poderia ser 1.0.0-linux-x64, 1.0.0-linux-musl-x64 ou 1.0.0-linux-arm64

PackageReference (em seu arquivo .csproj):

Adicione o seguinte ao seu arqruivo de projeto:

<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />

onde <rid> poderia ser 1.0.0-linux-x64, 1.0.0-linux-musl-x64 ou 1.0.0-linux-arm64

Definição do arquivo de configuração

Crie um arquivo de propriedades de configuração (por exemplo, gsr-config.properties) com as configurações necessárias:

Configuração mínima:

A seguir você verá um exemplo de configuração mínima:

region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true

Usando a biblioteca cliente C# Glue Schema para Kafka SerDes

Uso do serializador de amostra:

O exemplo a seguir mostra como usar o serializador:

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
Uso do desserializador de amostra:

O exemplo a seguir mostra como usar o desserializador:

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);

Usando a biblioteca cliente C# Glue Schema para KafkaFlow para SerDes

Uso do serializador de amostra:

O exemplo a seguir mostra como configurar o KafkaFlow com o serializador:

services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
Uso do desserializador de amostra:

O exemplo a seguir mostra como configurar o KafkaFlow com o desserializador:

.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )

Propriedades opcionais do produtor

Você pode estender seu arquivo de configuração com propriedades opcionais adicionais:

# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB

Formatos de dados suportados

As implementações Java e C# são compatíveis com os mesmos formatos de dados:

  • AVRO: formato binário Apache Avro

  • JSON: formato do esquema JSON

  • PROTOBUF: formato Protocol Buffers

Observações