C# 実装
注記
前提条件: 次のステップを実行する前に、Amazon Managed Streaming for Apache Kafka (Amazon MSK) または Apache Kafka クラスターを起動しておく必要があります。使用するプロデューサーとコンシューマーは、.NET 8.0 以上で実行する必要があります。
インストール
C# アプリケーションの場合は、次のいずれかの方法を使用して AWS Glue Schema Registry SerDe NuGet パッケージをインストールします。
.NET CLI:
パッケージをインストールするには、次のコマンドを使用します。
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
ここで、<rid> は 1.0.0-linux-x64、1.0.0-linux-musl-x64 または 1.0.0-linux-arm64 です。
PackageReference (.csproj ファイル内):
プロジェクトファイルに次の内容を追加します。
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
ここで、<rid> は 1.0.0-linux-x64、1.0.0-linux-musl-x64 または 1.0.0-linux-arm64 です。
設定ファイルのセットアップ
必要な設定で設定プロパティファイル (gsr-config.properties など) を作成します。
最小限の設定:
以下に最小限の設定例を示します。
region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true
Kafka SerDes 用の C# Glue スキーマクライアントライブラリの使用
シリアライザーの使用例:
次の例は、シリアライザーの使用方法を示しています。
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
デシリアライザーの使用例:
次の例は、デシリアライザーの使用方法を示しています。
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
SerDes 用の KafkaFlow での C# Glue スキーマクライアントライブラリの使用
シリアライザーの使用例:
次の例は、シリアライザーを使用して KafkaFlow を設定する方法を示しています。
services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
デシリアライザーの使用例:
次の例は、デシリアライザーを使用して KafkaFlow を設定する方法を示しています。
.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )
オプションのプロデューサープロパティ
追加のオプションプロパティを使用して、設定ファイルを拡張できます。
# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB
サポートされている日付書式
Java と C# の両方の実装は、同じデータ形式をサポートしています。
AVRO: Apache Avro バイナリ形式
JSON: JSON スキーマ形式
PROTOBUF: プロトコルバッファ形式
注意事項
ライブラリの使用を開始するには、https://www.nuget.org/packages/AWS.Glue.SchemaRegistry
にアクセスしてください ソースコードは、https://github.com/awslabs/aws-glue-schema-registry
で入手できます