

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden von Kafka Streams mit MSK Express-Brokern und MSK Serverless
<a name="use-kafka-streams-express-brokers-msk-serverless"></a>

Kafka Streams unterstützt zustandslose und zustandsbehaftete Transformationen. Zustandsorientierte Transformationen wie Count, Aggregate oder Join verwenden Operatoren, die ihren Status in internen Kafka-Themen speichern. Darüber hinaus speichern einige zustandslose Transformationen wie GroupBy oder Repartition ihre Ergebnisse in internen Kafka-Themen. Standardmäßig benennt Kafka Streams diese internen Themen auf der Grundlage des entsprechenden Operators. Wenn diese Themen nicht existieren, erstellt Kafka Streams interne Kafka-Themen. Für die Erstellung der internen Themen codiert Kafka Streams die segment.bytes-Konfiguration fest und legt sie auf 50 MB fest. [MSK Provisioned with Express Brokers](msk-configuration-express-read-write.md#msk-configuration-express-topic-configuration) [und MSK Serverless schützt einige Themenkonfigurationen, einschließlich segment.size bei der Themenerstellung.](serverless-config.md) Daher kann eine Kafka Streams-Anwendung mit statusbehafteten Transformationen die internen Themen nicht mithilfe von MSK Express-Brokern oder MSK Serverless erstellen.

Um solche Kafka Streams-Anwendungen auf MSK Express-Brokern oder MSK Serverless auszuführen, müssen Sie die internen Themen selbst erstellen. Identifizieren und benennen Sie dazu zunächst die Kafka Streams-Operatoren, für die Themen erforderlich sind. Erstellen Sie dann die entsprechenden internen Kafka-Themen.

**Anmerkung**  
Es hat sich bewährt, die Operatoren in Kafka Streams manuell zu benennen, insbesondere diejenigen, die von internen Themen abhängen. Informationen zur Benennung von Operatoren finden Sie unter [Benennen von Operatoren in einer Kafka Streams-DSL-Anwendung](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) in der Kafka Streams-Dokumentation.
Der interne Themenname für eine Stateful-Transformation hängt von der Version der Kafka Streams-Anwendung und dem Namen `application.id` des Stateful-Operators ab. `application.id-statefuloperator_name`

**Topics**
+ [Erstellen einer Kafka Streams-Anwendung mithilfe von MSK Express-Brokern oder MSK Serverless](#create-kafka-streams-app-express-broker-msk-serverless)

## Erstellen einer Kafka Streams-Anwendung mithilfe von MSK Express-Brokern oder MSK Serverless
<a name="create-kafka-streams-app-express-broker-msk-serverless"></a>

Wenn Ihre Kafka Streams-Anwendung auf `application.id` eingestellt ist`msk-streams-processing`, können Sie eine Kafka Streams-Anwendung mithilfe von MSK Express-Brokern oder MSK Serverless erstellen. Verwenden Sie dazu den `count()` Operator, der ein internes Thema mit dem Namen erfordert. Beispiel, `msk-streams-processing-count-store`.

Gehen Sie wie folgt vor, um eine Kafka Streams-Anwendung zu erstellen:

**Topics**
+ [Identifizieren und benennen Sie die Operatoren](#create-kafka-streams-app-identify-name-operators)
+ [Erstellen Sie die internen Themen](#create-kafka-streams-app-create-internal-topics)
+ [(Optional) Überprüfen Sie den Themennamen](#create-kafka-streams-app-check-topic-name)
+ [Beispiele für Benennungsoperatoren](#create-kafka-streams-app-naming-operators-examples)

### Identifizieren und benennen Sie die Operatoren
<a name="create-kafka-streams-app-identify-name-operators"></a>

1. Identifizieren Sie die Stateful-Prozessoren anhand der [Stateful-Transformationen](https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#stateful-transformations) in der Kafka Streams-Dokumentation.

   Einige Beispiele für Stateful-Prozessoren sind,, oder. `count` `aggregate` `join`

1. Identifizieren Sie die Prozessoren, die Themen für die Neupartitionierung erstellen.

   Das folgende Beispiel enthält eine `count()` Operation, die einen Status benötigt.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count()
           .toStream();
   ```

1. Um dem Thema einen Namen zu geben, fügen Sie einen Namen für jeden statusbehafteten Prozessor hinzu. Je nach Prozessortyp erfolgt die Benennung durch eine andere Benennungsklasse. Eine `count()` Operation ist beispielsweise eine Aggregationsoperation. Daher benötigt es die `Materialized` Klasse.

   Informationen zu den Benennungsklassen für die statusbehafteten Operationen finden Sie unter [Fazit](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) in der Dokumentation zu Kafka Streams.

   Im folgenden Beispiel wird der Name des `count()` Operators so festgelegt, dass er die `count-store` `Materialized` Klasse verwendet.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store")  // descriptive name for the store
               .withKeySerde(Serdes.String())
               .withValueSerde(Serdes.Long()))
           .toStream();
   ```

### Erstellen Sie die internen Themen
<a name="create-kafka-streams-app-create-internal-topics"></a>

Kafka Streams-Präfixe `application.id` für Namen interner Themen, wobei dies benutzerdefiniert `application.id` ist. Beispiel, `application.id-internal_topic_name`. Die internen Themen sind normale Kafka-Themen, und Sie können die Themen mithilfe der Informationen erstellen, die in [Erstellen Sie ein Apache Kafka-Thema](msk-serverless-create-topic.md) oder `AdminClient` über die Kafka-API verfügbar sind.

Je nach Anwendungsfall können Sie die standardmäßigen Bereinigungs- und Aufbewahrungsrichtlinien von Kafka Streams verwenden oder deren Werte anpassen. Sie definieren diese in und. `cleanup.policy` `retention.ms`

Im folgenden Beispiel werden die Themen mit der `AdminClient` API erstellt und der Wert `application.id` auf gesetzt**msk-streams-processing**.

```
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
    Collection<NewTopic> topics = new HashSet<>();
    topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
    client.createTopics(topics);
}
```

Nachdem die Themen auf dem Cluster erstellt wurden, kann Ihre Kafka Streams-Anwendung das `msk-streams-processing-count-store` Thema für den `count()` Vorgang verwenden.

### (Optional) Überprüfen Sie den Themennamen
<a name="create-kafka-streams-app-check-topic-name"></a>

Sie können den *Topography Describer* verwenden, um die Topologie Ihres Streams zu beschreiben und die Namen der internen Themen einzusehen. Das folgende Beispiel zeigt, wie der Topology Describer ausgeführt wird.

```
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
```

Die folgende Ausgabe zeigt die Topologie des Streams für das vorherige Beispiel.

```
Topology Description:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
      --> KSTREAM-AGGREGATE-0000000001
    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
      --> KTABLE-TOSTREAM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-AGGREGATE-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
      <-- KTABLE-TOSTREAM-0000000002
```

Informationen zur Verwendung des Topologiebeschreibers finden Sie unter [Benennen von Operatoren in einer Kafka Streams-DSL-Anwendung in der Kafka Streams-Dokumentation](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html).

### Beispiele für Benennungsoperatoren
<a name="create-kafka-streams-app-naming-operators-examples"></a>

Dieser Abschnitt enthält einige Beispiele für Benennungsoperatoren.

**Beispiel für einen Benennungsoperator für groupByKey ()**

```
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
```

**Beispiel für einen Benennungsoperator für normal count ()**

```
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Beispiel für einen Benennungsoperator für Count () im Fenster**

```
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Beispiel für einen Benennungsoperator für windowed suppressed ()**

```
windowed suppressed() -> 
Suppressed<Windowed> suppressed = Suppressed
        .untilWindowCloses(Suppressed.BufferConfig.unbounded())
        .withName("kafka-suppressed");
    .suppress(suppressed)
```