

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisation de Kafka Streams avec les courtiers MSK Express et MSK Serverless
<a name="use-kafka-streams-express-brokers-msk-serverless"></a>

Kafka Streams prend en charge les transformations apatrides et dynamiques. Les transformations dynamiques, telles que count, aggregate ou join, utilisent des opérateurs qui stockent leur état dans des rubriques internes de Kafka. En outre, certaines transformations apatrides telles que GroupBy ou repartition stockent leurs résultats dans des rubriques internes de Kafka. Par défaut, Kafka Streams nomme ces sujets internes en fonction de l'opérateur correspondant. Si ces sujets n'existent pas, Kafka Streams crée des sujets Kafka internes. Pour créer les sujets internes, Kafka Streams code en dur la configuration segment.bytes et la définit à 50 Mo. [MSK Provisioned with Express Brokers](msk-configuration-express-read-write.md#msk-configuration-express-topic-configuration) et MSK Serverless protège certaines [configurations de rubriques, notamment segment.size](serverless-config.md) lors de la création de rubriques. Par conséquent, une application Kafka Streams avec des transformations dynamiques ne parvient pas à créer les sujets internes à l'aide des courtiers MSK Express ou de MSK Serverless.

Pour exécuter de telles applications Kafka Streams sur des courtiers MSK Express ou MSK Serverless, vous devez créer vous-même les rubriques internes. Pour ce faire, identifiez et nommez d'abord les opérateurs Kafka Streams, qui nécessitent des sujets. Créez ensuite les sujets Kafka internes correspondants.

**Note**  
Il est recommandé de nommer les opérateurs manuellement dans Kafka Streams, en particulier ceux qui dépendent de sujets internes. Pour plus d'informations sur les opérateurs de dénomination, consultez la section [Opérateurs de dénomination dans une application DSL Kafka Streams](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) dans la documentation de Kafka Streams.
Le nom du sujet interne pour une transformation dynamique dépend `application.id` de l'application Kafka Streams et du nom de l'opérateur dynamique,. `application.id-statefuloperator_name`

**Topics**
+ [Création d'une application Kafka Streams à l'aide des courtiers MSK Express ou de MSK Serverless](#create-kafka-streams-app-express-broker-msk-serverless)

## Création d'une application Kafka Streams à l'aide des courtiers MSK Express ou de MSK Serverless
<a name="create-kafka-streams-app-express-broker-msk-serverless"></a>

Si votre application Kafka Streams est configurée sur`msk-streams-processing`, vous pouvez créer une application Kafka Streams à l'aide des courtiers MSK Express ou de MSK Serverless. `application.id` Pour ce faire, utilisez l'`count()`opérateur, qui nécessite un sujet interne portant le nom. Par exemple, `msk-streams-processing-count-store`.

Pour créer une application Kafka Streams, procédez comme suit :

**Topics**
+ [Identifier et nommer les opérateurs](#create-kafka-streams-app-identify-name-operators)
+ [Créez les sujets internes](#create-kafka-streams-app-create-internal-topics)
+ [(Facultatif) Vérifiez le nom du sujet](#create-kafka-streams-app-check-topic-name)
+ [Exemples d'opérateurs de dénomination](#create-kafka-streams-app-naming-operators-examples)

### Identifier et nommer les opérateurs
<a name="create-kafka-streams-app-identify-name-operators"></a>

1. Identifiez les processeurs dynamiques à l'aide des [transformations dynamiques de la documentation](https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#stateful-transformations) de Kafka Streams.

   Voici quelques exemples de processeurs dynamiques : `count` ou`join`. `aggregate`

1. Identifiez les processeurs qui créent des sujets pour le repartitionnement.

   L'exemple suivant contient une `count()` opération qui nécessite un état.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count()
           .toStream();
   ```

1. Pour nommer le sujet, ajoutez un nom pour chaque processeur dynamique. Selon le type de processeur, le nommage est effectué par une classe de dénomination différente. Par exemple, `count()` l'opération est une opération d'agrégation. Par conséquent, il a besoin de la `Materialized` classe.

   Pour plus d'informations sur les classes de dénomination pour les opérations dynamiques, consultez la section [Conclusion](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) de la documentation de Kafka Streams.

   L'exemple suivant définit le nom de l'`count()`opérateur de manière à `count-store` utiliser la `Materialized` classe.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store")  // descriptive name for the store
               .withKeySerde(Serdes.String())
               .withValueSerde(Serdes.Long()))
           .toStream();
   ```

### Créez les sujets internes
<a name="create-kafka-streams-app-create-internal-topics"></a>

Kafka Streams préfixe `application.id` les noms des sujets internes, où ils `application.id` sont définis par l'utilisateur. Par exemple, `application.id-internal_topic_name`. Les sujets internes sont des sujets Kafka normaux, et vous pouvez créer les sujets en utilisant les informations disponibles dans [Création d'un sujet Apache Kafka](msk-serverless-create-topic.md) ou depuis `AdminClient` l'API Kafka.

Selon votre cas d'utilisation, vous pouvez utiliser les politiques de nettoyage et de conservation par défaut de Kafka Streams, ou personnaliser leurs valeurs. Vous les définissez dans `cleanup.policy` et`retention.ms`.

L'exemple suivant crée les rubriques à l'aide de l'`AdminClient`API et définit la valeur `application.id` sur**msk-streams-processing**.

```
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
    Collection<NewTopic> topics = new HashSet<>();
    topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
    client.createTopics(topics);
}
```

Une fois les sujets créés sur le cluster, votre application Kafka Streams peut les `msk-streams-processing-count-store` utiliser pour l'`count()`opération.

### (Facultatif) Vérifiez le nom du sujet
<a name="create-kafka-streams-app-check-topic-name"></a>

Vous pouvez utiliser le *descripteur de topographie* pour décrire la topologie de votre flux et afficher les noms des rubriques internes. L'exemple suivant montre comment exécuter le descripteur de topologie.

```
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
```

La sortie suivante montre la topologie du flux pour l'exemple précédent.

```
Topology Description:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
      --> KSTREAM-AGGREGATE-0000000001
    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
      --> KTABLE-TOSTREAM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-AGGREGATE-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
      <-- KTABLE-TOSTREAM-0000000002
```

Pour plus d'informations sur l'utilisation du descripteur de topologie, consultez la section [Opérateurs de dénomination dans une application DSL Kafka Streams](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) dans la documentation de Kafka Streams.

### Exemples d'opérateurs de dénomination
<a name="create-kafka-streams-app-naming-operators-examples"></a>

Cette section fournit quelques exemples d'opérateurs de dénomination.

**Exemple d'opérateur de dénomination pour groupByKey ()**

```
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
```

**Exemple d'opérateur de dénomination pour normal count ()**

```
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Exemple d'opérateur de dénomination pour fenêtré ()**

```
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Exemple d'opérateur de dénomination pour fenêtré supprimé ()**

```
windowed suppressed() -> 
Suppressed<Windowed> suppressed = Suppressed
        .untilWindowCloses(Suppressed.BufferConfig.unbounded())
        .withName("kafka-suppressed");
    .suppress(suppressed)
```