翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
MSK Express ブローカーおよび MSK Serverless での Kafka Streams の使用
Kafka Streams は、ステートレス変換およびステートフル変換をサポートしています。状態を保持する変換 (ウント、集計、結合など) は、内部の Kafka トピックに状態を保存する演算子を使用します。さらに、groupBy や再パーティションなどのステートレス変換の一部は、結果を内部 Kafka トピックに保存します。デフォルトでは、Kafka Streams は対応する操作に基づいてこれらの内部トピックに名前を付けます。これらのトピックが存在しない場合、Kafka Streams は内部 Kafka トピックを作成します。内部トピックの作成において、Kafka Streams は segment.bytes 設定をハードコードし、50 MB に設定します。Express ブローカーと MSK Serverless でプロビジョニングされた MSK は、トピック作成時の segment.size など、一部のトピック設定を保護します。したがって、ステートフル変換を含む Kafka Streams アプリケーションは、MSK Express ブローカーまたは MSK Serverless を使用して内部トピックを作成できません。
MSK Express ブローカーまたは MSK Serverless 上でこのような Kafka Streams アプリケーションを実行するには、内部トピックを自身で作成する必要があります。これを行うには、まずトピックを必要とする Kafka Streams 演算子を特定し、名前を付けます。次に、対応する内部 Kafka トピックを作成します。
注記
-
Kafka Streams では、特に内部トピックに依存する演算子に手動で名前を付けるのがベストプラクティスです。演算子の命名に関する情報については、Kafka Streams ドキュメント内の Kafka Streams DSL アプリケーションにおける演算子の命名
を参照してください。 -
ステートフル変換の内部トピック名は、Kafka Streams アプリケーションの
application.idとステートフル演算子の名前 (application.id-statefuloperator_name) によって異なります。
MSK Express ブローカーまたは MSK Serverless を使用した Kafka Streams アプリケーションの作成
Kafka Streams アプリケーションが にapplication.id設定されている場合msk-streams-processing、MSK Express ブローカーまたは MSK Serverless を使用して Kafka Streams アプリケーションを作成できます。これを行うには、 count()演算子を使用します。この演算子には、 という名前の内部トピックが必要です。例えば、msk-streams-processing-count-store。
Kafka Streams アプリケーションを作成するには、次の手順を実行します。
演算子の特定と名前付け
-
Kafka Streams ドキュメントの ステートフル変換
を使用してステートフルプロセッサを特定します。 ステートフルプロセッサの例としては
count、aggregate、またはjoinなどが挙げられる。 -
再パーティショニング用のトピックを作成するプロセッサを特定する。
次の例には、 状態を必要とする
count()オペレーションが含まれています。var stream = paragraphStream .groupByKey() .count() .toStream(); -
トピックに名前を付けるには、各ステートフルプロセッサに名前を追加してください。プロセッサタイプに基づいて、命名は別の命名クラスによって行われます。たとえば、
count()オペレーションは集計オペレーションです。したがって、Materializedクラスが必要です。ステートフルオペレーションの命名クラスの詳細については、Kafka Streams ドキュメントの「結論
」を参照してください。 次の例では、
count()演算子の名前をMaterializedクラスを使用してcount-storeに設定します。var stream = paragraphStream .groupByKey() .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())) .toStream();
内部トピックを作成する
Kafka Streams は内部トピックの名前に application.id をプレフィックスとして付けます。ここで application.id はユーザー定義です。例えば、application.id-internal_topic_name。内部トピックは通常の Kafka トピックであり、Kafka API の Apache Kafka トピックを作成する または AdminClient で利用可能な情報を使用してトピックを作成できます。
ユースケースに応じて、Kafka Streams のデフォルトのクリーンアップポリシーと保持ポリシーを使用するか、それらの値をカスタマイズできます。これらは cleanup.policy と retention.ms で定義します。
次の例では、 AdminClient API を使用してトピックを作成し、 application.id を msk-streams-processing に設定します。
try (AdminClient client = AdminClient.create(configs.kafkaProps())) { Collection<NewTopic> topics = new HashSet<>(); topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3)); client.createTopics(topics); }
トピックが クラスター に作成されると、Kafka Streams アプリケーションは msk-streams-processing-count-store トピックを count() オペレーションに使用できます。
(オプション) トピック名を確認する
トポグラフィ記述子を使用して、ストリームのトポロジを記述し、内部トピックの名前を表示できます。次の例は、トポロジ記述子を実行する方法を示しています。
final StreamsBuilder builder = new StreamsBuilder(); Topology topology = builder.build(); System.out.println(topology.describe());
次の出力は、前の例のストリームのトポロジを示しています。
Topology Description: Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) --> KSTREAM-AGGREGATE-0000000001 Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store]) --> KTABLE-TOSTREAM-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KTABLE-TOSTREAM-0000000002 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-AGGREGATE-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: output_topic) <-- KTABLE-TOSTREAM-0000000002
トポロジ記述子の使用方法については、Kafka Streams ドキュメントの Kafka Streams DSL アプリケーションの命名演算子
命名演算子の例
このセクションでは、命名演算子のに関するいくつかの例を示します。
groupByKey() の命名演算子の例
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
通常の count() の命名演算子の例
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
windowed count() の命名演算子の例
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
windowed suppressed の命名演算子の例 ()
windowed suppressed() -> Suppressed<Windowed> suppressed = Suppressed .untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName("kafka-suppressed"); .suppress(suppressed)