View a markdown version of this page

C# 实现 - AWS Glue

C# 实现

注意

先决条件:完成以下步骤之前,您需要先运行 Amazon Managed Streaming for Apache Kafka(Amazon MSK)或 Apache Kafka 集群。您的生产者和使用者需要在 .NET 8.0 或更高版本上运行。

安装

对于 C# 应用程序,请使用以下方法之一安装 AWS Glue 架构注册表 SerDe NuGet 软件包:

.NET CLI:

使用以下命令安装软件包:

dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>

其中,<rid> 可能为 1.0.0-linux-x641.0.0-linux-musl-x641.0.0-linux-arm64

PackageReference(在您的 .csproj 文件中):

请将以下内容添加至您的项目文件:

<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />

其中,<rid> 可能为 1.0.0-linux-x641.0.0-linux-musl-x641.0.0-linux-arm64

配置文件设置

使用所需设置创建配置属性文件(例如 gsr-config.properties):

最低配置:

下面显示了一个最低配置示例:

region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true

在 Kafka SerDes 上使用 C# Glue 架构客户端库

序列化器用法示例:

以下示例显示了如何使用序列化器:

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
反序列化器用法示例:

以下示例显示了如何使用反序列化器:

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);

在 SerDes 的 KafkaFlow 中使用 C# Glue 架构客户端库

序列化器用法示例:

以下示例显示了如何使用序列化器配置 KafkaFlow:

services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
反序列化器用法示例:

以下示例显示了如何使用反序列化器配置 KafkaFlow:

.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )

可选生产者属性

您可以使用其他可选属性扩展配置文件:

# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB

支持的数据格式

Java 和 C# 实现都支持相同的数据格式:

  • AVRO:Apache Avro 二进制格式

  • JSON:JSON 架构格式

  • PROTOBUF:协议缓冲区格式

备注