

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Utilizzo di Kafka Streams con i broker MSK Express e MSK Serverless
<a name="use-kafka-streams-express-brokers-msk-serverless"></a>

Kafka Streams supporta trasformazioni stateless e stateful. Le trasformazioni stateful, come count, aggregate o join, utilizzano operatori che memorizzano il loro stato in argomenti interni di Kafka. Inoltre, alcune trasformazioni stateless come groupBy o repartition memorizzano i risultati in argomenti interni di Kafka. Per impostazione predefinita, Kafka Streams nomina questi argomenti interni in base all'operatore corrispondente. Se questi argomenti non esistono, Kafka Streams crea argomenti Kafka interni. Per creare gli argomenti interni, Kafka Streams codifica la configurazione segment.bytes e la imposta su 50 MB. [MSK Provisioned with Express brokers e MSK](msk-configuration-express-read-write.md#msk-configuration-express-topic-configuration) [Serverless protegge alcune configurazioni degli argomenti, tra cui segment.size, durante la creazione degli argomenti.](serverless-config.md) Pertanto, un'applicazione Kafka Streams con trasformazioni stateful non riesce a creare gli argomenti interni utilizzando i broker MSK Express o MSK Serverless.

Per eseguire tali applicazioni Kafka Streams sui broker MSK Express o MSK Serverless, è necessario creare personalmente gli argomenti interni. A tale scopo, è necessario innanzitutto identificare e assegnare un nome agli operatori di Kafka Streams, che richiedono argomenti. Quindi, create i corrispondenti argomenti interni di Kafka.

**Nota**  
È buona norma denominare manualmente gli operatori in Kafka Streams, specialmente quelli che dipendono da argomenti interni. Per informazioni sulla denominazione degli operatori, vedere [Naming Operators in a Kafka Streams DSL Application nella documentazione di Kafka Streams](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html).
Il nome dell'argomento interno per una trasformazione stateful dipende dall'applicazione Kafka Streams e dal nome `application.id` dell'operatore stateful,. `application.id-statefuloperator_name`

**Topics**
+ [Creazione di un'applicazione Kafka Streams utilizzando i broker MSK Express o MSK Serverless](#create-kafka-streams-app-express-broker-msk-serverless)

## Creazione di un'applicazione Kafka Streams utilizzando i broker MSK Express o MSK Serverless
<a name="create-kafka-streams-app-express-broker-msk-serverless"></a>

Se l'applicazione Kafka Streams è `application.id` impostata su`msk-streams-processing`, è possibile creare un'applicazione Kafka Streams utilizzando i broker MSK Express o MSK Serverless. Per fare ciò, utilizzate l'`count()`operatore, che richiede un argomento interno con il nome. Ad esempio, `msk-streams-processing-count-store`.

Per creare un'applicazione Kafka Streams, procedi come segue:

**Topics**
+ [Identifica e assegna un nome agli operatori](#create-kafka-streams-app-identify-name-operators)
+ [Create gli argomenti interni](#create-kafka-streams-app-create-internal-topics)
+ [(Facoltativo) Controlla il nome dell'argomento](#create-kafka-streams-app-check-topic-name)
+ [Esempi di operatori di denominazione](#create-kafka-streams-app-naming-operators-examples)

### Identifica e assegna un nome agli operatori
<a name="create-kafka-streams-app-identify-name-operators"></a>

1. Identifica i processori con stato utilizzando le [trasformazioni Stateful](https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#stateful-transformations) nella documentazione di Kafka Streams.

   Alcuni esempi di processori con stato includono, o. `count` `aggregate` `join`

1. Identifica i processori che creano argomenti per il ripartizionamento.

   L'esempio seguente contiene un'`count()`operazione che richiede uno stato.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count()
           .toStream();
   ```

1. Per assegnare un nome all'argomento, aggiungete un nome per ogni processore stateful. In base al tipo di processore, la denominazione viene effettuata da una classe di denominazione diversa. Ad esempio, `count()` l'operazione è un'operazione di aggregazione. Pertanto, ha bisogno della `Materialized` classe.

   Per informazioni sulle classi di denominazione per le operazioni stateful, vedi [Conclusioni](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) nella documentazione di Kafka Streams.

   L'esempio seguente imposta il nome dell'operatore per l'utilizzo della classe. `count()` `count-store` `Materialized`

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store")  // descriptive name for the store
               .withKeySerde(Serdes.String())
               .withValueSerde(Serdes.Long()))
           .toStream();
   ```

### Create gli argomenti interni
<a name="create-kafka-streams-app-create-internal-topics"></a>

I prefissi di Kafka Streams `application.id` ai nomi degli argomenti interni, dove sono definiti dall'utente. `application.id` Ad esempio, `application.id-internal_topic_name`. Gli argomenti interni sono normali argomenti di Kafka e puoi crearli utilizzando le informazioni disponibili in o dell'API Kafka. [Crea un argomento di Apache Kafka](msk-serverless-create-topic.md) `AdminClient`

A seconda del caso d'uso, è possibile utilizzare le politiche di pulizia e conservazione predefinite di Kafka Streams o personalizzarne i valori. Li definisci in e. `cleanup.policy` `retention.ms`

L'esempio seguente crea gli argomenti con l'`AdminClient`API e li imposta `application.id` su**msk-streams-processing**.

```
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
    Collection<NewTopic> topics = new HashSet<>();
    topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
    client.createTopics(topics);
}
```

Dopo aver creato gli argomenti nel cluster, l'applicazione Kafka Streams può utilizzare l'`msk-streams-processing-count-store`argomento per l'operazione. `count()`

### (Facoltativo) Controlla il nome dell'argomento
<a name="create-kafka-streams-app-check-topic-name"></a>

Puoi usare il *descrittore topografico* per descrivere la topologia del tuo stream e visualizzare i nomi degli argomenti interni. L'esempio seguente mostra come eseguire il descrittore di topologia.

```
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
```

L'output seguente mostra la topologia del flusso per l'esempio precedente.

```
Topology Description:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
      --> KSTREAM-AGGREGATE-0000000001
    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
      --> KTABLE-TOSTREAM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-AGGREGATE-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
      <-- KTABLE-TOSTREAM-0000000002
```

Per informazioni su come utilizzare il descrittore di topologia, vedere [Naming Operators in a Kafka Streams DSL Application nella documentazione di Kafka Streams](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html).

### Esempi di operatori di denominazione
<a name="create-kafka-streams-app-naming-operators-examples"></a>

Questa sezione fornisce alcuni esempi di operatori di denominazione.

**Esempio di operatore di denominazione per groupByKey ()**

```
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
```

**Esempio di operatore di denominazione per il normale count ()**

```
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Esempio di operatore di denominazione per windowed count ()**

```
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Esempio di operatore di denominazione per windowowed suppressed ()**

```
windowed suppressed() -> 
Suppressed<Windowed> suppressed = Suppressed
        .untilWindowCloses(Suppressed.BufferConfig.unbounded())
        .withName("kafka-suppressed");
    .suppress(suppressed)
```