C# 实现
注意
先决条件:完成以下步骤之前,您需要先运行 Amazon Managed Streaming for Apache Kafka(Amazon MSK)或 Apache Kafka 集群。您的生产者和使用者需要在 .NET 8.0 或更高版本上运行。
安装
对于 C# 应用程序,请使用以下方法之一安装 AWS Glue 架构注册表 SerDe NuGet 软件包:
.NET CLI:
使用以下命令安装软件包:
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
其中,<rid> 可能为 1.0.0-linux-x64、1.0.0-linux-musl-x64 或 1.0.0-linux-arm64
PackageReference(在您的 .csproj 文件中):
请将以下内容添加至您的项目文件:
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
其中,<rid> 可能为 1.0.0-linux-x64、1.0.0-linux-musl-x64 或 1.0.0-linux-arm64
配置文件设置
使用所需设置创建配置属性文件(例如 gsr-config.properties):
最低配置:
下面显示了一个最低配置示例:
region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true
在 Kafka SerDes 上使用 C# Glue 架构客户端库
序列化器用法示例:
以下示例显示了如何使用序列化器:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
反序列化器用法示例:
以下示例显示了如何使用反序列化器:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
在 SerDes 的 KafkaFlow 中使用 C# Glue 架构客户端库
序列化器用法示例:
以下示例显示了如何使用序列化器配置 KafkaFlow:
services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
反序列化器用法示例:
以下示例显示了如何使用反序列化器配置 KafkaFlow:
.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )
可选生产者属性
您可以使用其他可选属性扩展配置文件:
# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB
支持的数据格式
Java 和 C# 实现都支持相同的数据格式:
AVRO:Apache Avro 二进制格式
JSON:JSON 架构格式
PROTOBUF:协议缓冲区格式