

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 MSK Express 代理程式和 MSK Serverless 使用 Kafka Streams
<a name="use-kafka-streams-express-brokers-msk-serverless"></a>

Kafka Streams 支援無狀態和有狀態轉換。狀態轉換，例如計數、彙總或聯結，使用在內部 Kafka 主題中存放其狀態的運算子。此外，有些無狀態轉換，例如 groupBy 或 repartition，會將結果存放在內部 Kafka 主題中。根據預設，Kafka Streams 會根據對應的運算子來命名這些內部主題。如果這些主題不存在，Kafka Streams 會建立內部 Kafka 主題。為了建立內部主題，Kafka Streams 會硬式編碼 segment.bytes 組態，並將其設定為 50 MB。[MSK 使用 Express 代理](msk-configuration-express-read-write.md#msk-configuration-express-topic-configuration)程式和 MSK Serverless 佈建可保護某些[主題組態](serverless-config.md)，包括在主題建立期間 segment.size。因此，具有狀態轉換的 Kafka Streams 應用程式無法使用 MSK Express 代理程式或 MSK Serverless 建立內部主題。

若要在 MSK Express 代理程式或 MSK Serverless 上執行此類 Kafka Streams 應用程式，您必須自行建立內部主題。若要這樣做，請先識別並命名需要主題的 Kafka Streams 運算子。然後，建立對應的內部 Kafka 主題。

**注意**  
最佳實務是在 Kafka Streams 中手動命名運算子，尤其是依賴內部主題的運算子。如需有關命名運算子的資訊，請參閱 [Kafka Streams 文件中的在 Kafka Streams DSL 應用程式中命名運算子](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html)。
狀態轉換的內部主題名稱取決於 `application.id` Kafka Streams 應用程式的 和狀態運算子 的名稱`application.id-statefuloperator_name`。

**Topics**
+ [使用 MSK Express 代理程式或 MSK Serverless 建立 Kafka Streams 應用程式](#create-kafka-streams-app-express-broker-msk-serverless)

## 使用 MSK Express 代理程式或 MSK Serverless 建立 Kafka Streams 應用程式
<a name="create-kafka-streams-app-express-broker-msk-serverless"></a>

如果您的 Kafka Streams 應用程式將其`application.id`設定為 `msk-streams-processing`，您可以使用 MSK Express 代理程式或 MSK Serverless 建立 Kafka Streams 應用程式。若要這樣做，請使用 `count()`運算子，該運算子需要名稱為 的內部主題。例如 `msk-streams-processing-count-store`。

若要建立 Kafka Streams 應用程式，請執行下列動作：

**Topics**
+ [識別運算子並命名](#create-kafka-streams-app-identify-name-operators)
+ [建立內部主題](#create-kafka-streams-app-create-internal-topics)
+ [（選用） 檢查主題名稱](#create-kafka-streams-app-check-topic-name)
+ [命名運算子的範例](#create-kafka-streams-app-naming-operators-examples)

### 識別運算子並命名
<a name="create-kafka-streams-app-identify-name-operators"></a>

1. 使用 Kafka Streams 文件中的具狀態[轉換來識別具狀態](https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#stateful-transformations)處理器。

   具狀態處理器的一些範例包括 `count`、 `aggregate`或 `join`。

1. 識別建立重新分割主題的處理器。

   下列範例包含需要 狀態`count()`的操作。

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count()
           .toStream();
   ```

1. 若要命名主題，請為每個具狀態處理器新增名稱。根據處理器類型，命名是由不同的命名類別完成。例如， `count()` 操作是彙總操作。因此，它需要 `Materialized`類別。

   如需有關具狀態操作命名類別的資訊，請參閱 Kafka Streams 文件中的[結論](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html)。

   下列範例`count-store`使用 `Materialized`類別將 `count()` 運算子的名稱設定為 。

   ```
   var stream =
       paragraphStream
           .groupByKey()
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store")  // descriptive name for the store
               .withKeySerde(Serdes.String())
               .withValueSerde(Serdes.Long()))
           .toStream();
   ```

### 建立內部主題
<a name="create-kafka-streams-app-create-internal-topics"></a>

Kafka Streams 字首`application.id`為內部主題的名稱，其中 `application.id`是使用者定義的。例如 `application.id-internal_topic_name`。內部主題是一般 Kafka 主題，您可以使用 Kafka API [建立 Apache Kafka 主題](msk-serverless-create-topic.md)或 中提供的資訊`AdminClient`來建立主題。

根據您的使用案例，您可以使用 Kafka Streams 的預設清除和保留政策，或自訂其值。您可以在 `cleanup.policy`和 中定義這些項目`retention.ms`。

下列範例會使用 `AdminClient` API 建立主題，並將 `application.id`設定為 **msk-streams-processing**。

```
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
    Collection<NewTopic> topics = new HashSet<>();
    topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
    client.createTopics(topics);
}
```

在叢集上建立主題後，您的 Kafka Streams 應用程式可以使用 `msk-streams-processing-count-store`主題進行`count()`操作。

### （選用） 檢查主題名稱
<a name="create-kafka-streams-app-check-topic-name"></a>

您可以使用*拓撲描述程式*來描述串流的拓撲，並檢視內部主題的名稱。下列範例示範如何執行拓撲描述程式。

```
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
```

下列輸出顯示上述範例的串流拓撲。

```
Topology Description:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
      --> KSTREAM-AGGREGATE-0000000001
    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
      --> KTABLE-TOSTREAM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-AGGREGATE-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
      <-- KTABLE-TOSTREAM-0000000002
```

如需有關如何使用拓撲描述器的資訊，請參閱 [Kafka Streams 文件中的在 Kafka Streams DSL 應用程式中命名運算](https://kafka.apache.org/38/documentation/streams/developer-guide/dsl-topology-naming.html)子。

### 命名運算子的範例
<a name="create-kafka-streams-app-naming-operators-examples"></a>

本節提供一些命名運算子的範例。

**groupByKey() 的命名運算子範例**

```
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
```

**normal count() 的命名運算子範例**

```
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**視窗化 count() 的命名運算子範例**

```
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window")  // descriptive name for the store
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long()))
```

**視窗抑制的命名運算子範例 ()**

```
windowed suppressed() -> 
Suppressed<Windowed> suppressed = Suppressed
        .untilWindowCloses(Suppressed.BufferConfig.unbounded())
        .withName("kafka-suppressed");
    .suppress(suppressed)
```