

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Kafka Streams 与 MSK 快速代理和 MSK Serverless 结合使用
<a name="use-kafka-streams-express-brokers-msk-serverless"></a>

Kafka Streams 支持无状态和有状态转换。有状态转换（例如计数、聚合或联接）使用运算符，将其状态存储在内部 Kafka 主题中。此外，一些无状态转换（例如 GroupBy 或重分区）则将其结果存储在内部 Kafka 主题中。默认情况下，Kafka Streams 根据相应的运算符对这些内部主题进行命名。如果这些主题不存在，Kafka Streams 则创建内部 Kafka 主题。为创建内部主题，Kafka Streams 对 segment.bytes 配置进行硬编码，并将其设置为 50 MB。[使用快速代理和 MSK Serverless 配置](msk-configuration-express-read-write.md#msk-configuration-express-topic-configuration)的 MSK 可保护某些[主题配置](serverless-config.md)，包括主题创建期间的 segment.size。因此，采用有状态转换的 Kafka Streams 应用程序无法使用 MSK 快速代理或 MSK Serverless 创建内部主题。

要在 MSK 快速代理或 MSK Serverless 上运行这样的 Kafka Streams 应用程序，您必须自行创建内部主题。为此，需先识别需要主题的 Kafka Streams 运算符并为其命名。然后，创建相应的内部 Kafka 主题。

**注意**  
最好在 Kafka Streams 中手动命名运算符，尤其是依赖于内部主题的运算符。有关命名运算符的信息，请参阅 Kafka Streams 文档中的[ Kafka Streams DSL 应用程序中的命名运算符](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html)。
有状态转换的内部主题名称取决于 Kafka Streams 应用程序的 `application.id` 以及有状态运算符 `application.id-statefuloperator_name` 的名称。

**Topics**
+ [

## 使用 MSK 快速代理或 MSK Serverless 创建 Kafka Streams 应用程序
](#create-kafka-streams-app-express-broker-msk-serverless)

## 使用 MSK 快速代理或 MSK Serverless 创建 Kafka Streams 应用程序
<a name="create-kafka-streams-app-express-broker-msk-serverless"></a>

如果 Kafka Streams 应用程序将其 `application.id` 设置为 `msk-streams-processing`，则可以使用 MSK 快速代理或 MSK Serverless 创建 Kafka Streams 应用程序。为此，请使用 `count()` 运算符，它需要一个具有该名称的内部主题。例如 `msk-streams-processing-count-store`。

要创建 Kafka Streams 应用程序，请执行以下操作：

**Topics**
+ [

### 识别并命名运算符
](#create-kafka-streams-app-identify-name-operators)
+ [

### 创建内部主题
](#create-kafka-streams-app-create-internal-topics)
+ [

### （可选）检查主题名称
](#create-kafka-streams-app-check-topic-name)
+ [

### 命名运算符示例
](#create-kafka-streams-app-naming-operators-examples)

### 识别并命名运算符
<a name="create-kafka-streams-app-identify-name-operators"></a>

1. 使用 Kafka Streams 文档中的[有状态转换](https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#stateful-transformations)来识别有状态处理器。

   有状态处理器的一些示例包括 `count`、`aggregate` 或 `join`。

1. 确定为重新分区创建主题的处理器。

   以下示例包含一项需要状态的 `count()` 运算。

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count()
           .toStream();
   ```

1. 要命名主题，请为每个有状态处理器添加名称。根据处理器类型，由不同的命名类完成命名。例如，`count()` 运算是一种聚合操作。因此，它需要 `Materialized` 类。

   有关进行有状态操作的命名类的信息，请参阅 Kafka Streams 文档中的[结论](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html)。

   在以下示例中，使用 `Materialized` 类将 `count()` 运算符的名称设置为 `count-store`。

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store")  // descriptive name for the store
               .withKeySerde(Serdes.String())
               .withValueSerde(Serdes.Long()))
           .toStream();
   ```

### 创建内部主题
<a name="create-kafka-streams-app-create-internal-topics"></a>

Kafka Streams 将 `application.id` 作为内部主题名称的前缀，其中 `application.id` 由用户定义。例如 `application.id-internal_topic_name`。内部主题是普通的 Kafka 主题，您可以使用 Kafka API 中的 [创建 Apache Kafka 主题](msk-serverless-create-topic.md) 或 `AdminClient` 提供的信息创建主题。

根据使用案例，可使用 Kafka Streams 的默认清理和保留策略，也可以自定义其值。可在 `cleanup.policy` 和 `retention.ms` 中定义这些值。

在以下示例中，使用 `AdminClient` API 创建主题并将 `application.id` 设置为 **msk-streams-processing**。

```
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
    Collection<NewTopic> topics = new HashSet<>();
    topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
    client.createTopics(topics);
}
```

在集群上创建主题后，Kafka Streams 应用程序可使用 `msk-streams-processing-count-store` 主题进行 `count()` 运算。

### （可选）检查主题名称
<a name="create-kafka-streams-app-check-topic-name"></a>

可使用*拓扑描述器*来描述流的拓扑结构，并查看内部主题的名称。以下示例演示如何运行拓扑描述器。

```
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
```

以下输出显示了上述示例的流拓扑结构。

```
Topology Description:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
      --> KSTREAM-AGGREGATE-0000000001
    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
      --> KTABLE-TOSTREAM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-AGGREGATE-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
      <-- KTABLE-TOSTREAM-0000000002
```

有关如何使用拓扑描述器的信息，请参阅 Kafka Streams 文档中的[ Kafka Streams DSL 应用程序中的命名运算符](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html)。

### 命名运算符示例
<a name="create-kafka-streams-app-naming-operators-examples"></a>

本部分提供了命名运算符的一些示例。

** groupByKey() 的命名运算符示例**

```
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
```

**常规 count() 命名运算符的示例**

```
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**窗口式 count() 的命名运算符示例**

```
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**窗口式 suppressed() 的命名运算符示例**

```
windowed suppressed() -> 
Suppressed<Windowed> suppressed = Suppressed
        .untilWindowCloses(Suppressed.BufferConfig.unbounded())
        .withName("kafka-suppressed");
    .suppress(suppressed)
```