

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Uso do Kafka Streams com agentes MSK Express e MSK Serverless
<a name="use-kafka-streams-express-brokers-msk-serverless"></a>

O Kafka Streams é compatível com transformações com e sem estado. Transformações com estado, como contar, agregar ou unir, usam operadores que armazenam seu estado em tópicos internos do Kafka. Além disso, algumas transformações sem estado, como groupBy ou repartição, armazenam resultados em tópicos internos do Kafka. Por padrão, o Kafka Streams nomeia esses tópicos internos com base no operador correspondente. Se esses tópicos não existirem, o Kafka Streams criará os tópicos internos do Kafka. Para criar os tópicos internos, o Kafka Streams codifica a configuração segment.bytes e a define em 50 MB. [O MSK Provisioned com agentes Express](msk-configuration-express-read-write.md#msk-configuration-express-topic-configuration) e o MSK Serverless protegem algumas [configurações de tópicos](serverless-config.md), incluindo segment.size, durante a criação do tópico. Portanto, uma aplicação do Kafka Streams com transformações com estado falha ao criar os tópicos internos usando os agentes MSK Express ou MSK Serverless.

Para executar essas aplicações do Kafka Streams em agentes MSK Express ou MSK Serverless, você mesmo deve criar os tópicos internos. Para isso, primeiro identifique e nomeie os operadores do Kafka Streams que exigem tópicos. Em seguida, crie os tópicos internos correspondentes do Kafka.

**nota**  
É uma prática recomendada nomear os operadores de maneira manual no Kafka Streams, especialmente aqueles que dependem de tópicos internos. Para saber como nomear operadores, consulte [Nomeando operadores em uma aplicação DSL do Kafka Streams](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) na documentação do Kafka Streams.
O nome do tópico interno de uma transformação com estado depende da `application.id` da aplicação do Kafka Streams e do nome do operador com estado, `application.id-statefuloperator_name`.

**Topics**
+ [Criação de uma aplicação Kafka Streams usando agentes MSK Express ou MSK Serverless](#create-kafka-streams-app-express-broker-msk-serverless)

## Criação de uma aplicação Kafka Streams usando agentes MSK Express ou MSK Serverless
<a name="create-kafka-streams-app-express-broker-msk-serverless"></a>

Se sua aplicação Kafka Streams tiver a `application.id` configurada como `msk-streams-processing`, você poderá criar uma aplicação Kafka Streams usando agentes MSK Express ou MSK Serverless. Para fazer isso, use o operador `count()`, que requer um tópico interno com o nome. Por exemplo, .`msk-streams-processing-count-store`

Para criar uma aplicação Kafka Streams, faça o seguinte:

**Topics**
+ [Identifique e nomeie os operadores](#create-kafka-streams-app-identify-name-operators)
+ [Criação dos tópicos internos](#create-kafka-streams-app-create-internal-topics)
+ [(Opcional) Verificar o nome do tópico](#create-kafka-streams-app-check-topic-name)
+ [Exemplos de nomenclatura de operadores](#create-kafka-streams-app-naming-operators-examples)

### Identifique e nomeie os operadores
<a name="create-kafka-streams-app-identify-name-operators"></a>

1. Identifique os processadores com estado usando as [Transformações com estado](https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#stateful-transformations) na documentação do Kafka Streams.

   Alguns exemplos de processadores com estado incluem `count`, `aggregate` ou`join`.

1. Identifique os processadores que criam tópicos para reparticionamento.

   O exemplo a seguir contém uma operação `count()` que precisa de um estado.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count()
           .toStream();
   ```

1. Para nomear o tópico, adicione um nome para cada processador com estado. Com base no tipo de processador, a nomenclatura é feita por uma classe de nomenclatura diferente. Por exemplo, a operação `count()` é de agregação. Portanto, ela precisa da classe `Materialized`.

   Para saber mais sobre as classes de nomenclatura para as operações com estado, consulte a [Conclusão](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) na documentação do Kafka Streams.

   O exemplo a seguir define o nome do operador `count()` para `count-store` usar a classe `Materialized`.

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store")  // descriptive name for the store
               .withKeySerde(Serdes.String())
               .withValueSerde(Serdes.Long()))
           .toStream();
   ```

### Criação dos tópicos internos
<a name="create-kafka-streams-app-create-internal-topics"></a>

O Kafka Streams coloca prefixos `application.id` em nomes de tópicos internos, em que `application.id` é definido pelo usuário. Por exemplo, .`application.id-internal_topic_name` Os tópicos internos são normais do Kafka, e você pode criar os tópicos usando as informações disponíveis em [Criar um tópico do Apache Kafka](msk-serverless-create-topic.md) ou `AdminClient` da API do Kafka.

Dependendo do seu caso de uso, você pode usar as políticas padrão de limpeza e retenção do Kafka Streams ou personalizar os valores. Você os define em `cleanup.policy` e `retention.ms`.

O exemplo a seguir cria os tópicos com a API `AdminClient` e define o `application.id` como **msk-streams-processing**.

```
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
    Collection<NewTopic> topics = new HashSet<>();
    topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
    client.createTopics(topics);
}
```

Depois que os tópicos forem criados no cluster, sua aplicação Kafka Streams poderá usar o tópico `msk-streams-processing-count-store` na operação `count()`.

### (Opcional) Verificar o nome do tópico
<a name="create-kafka-streams-app-check-topic-name"></a>

Você pode usar o *descritor de topografia* para descrever a topologia do seu stream e visualizar os nomes dos tópicos internos. O exemplo a seguir mostra como executar o descritor da topologia.

```
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
```

A saída a seguir mostra a topologia do fluxo para o exemplo anterior.

```
Topology Description:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
      --> KSTREAM-AGGREGATE-0000000001
    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
      --> KTABLE-TOSTREAM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-AGGREGATE-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
      <-- KTABLE-TOSTREAM-0000000002
```

Para saber como usar o descritor de topologia, consulte [Nomeando operadores em uma aplicação DSL do Kafka Streams](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html) na documentação do Kafka Streams.

### Exemplos de nomenclatura de operadores
<a name="create-kafka-streams-app-naming-operators-examples"></a>

Esta seção fornece exemplos de operações de nomenclatura.

**Exemplo de operador de nomenclatura para groupByKey ()**

```
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
```

**Exemplo de operador de nomenclatura para contagem normal()**

```
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Exemplo de operador de nomenclatura para contagem em janela()**

```
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**Exemplo de operador de nomenclatura para suprimido em janela()**

```
windowed suppressed() -> 
Suppressed<Windowed> suppressed = Suppressed
        .untilWindowCloses(Suppressed.BufferConfig.unbounded())
        .withName("kafka-suppressed");
    .suppress(suppressed)
```