

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 設定 MSK Connect 的 EventBridge Kafka 接收器連接器
<a name="mkc-eventbridge-kafka-connector"></a>

本主題說明如何設定 MSK Connect 的 [EventBridge Kafka 接收器連接器](https://github.com/awslabs/eventbridge-kafka-connector)。此連接器可讓您將事件從 MSK 叢集傳送至 EventBridge [事件匯流排](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html)。本主題說明建立必要資源和設定連接器以啟用 Kafka 和 EventBridge 之間無縫資料流程的程序。

**Topics**
+ [先決條件](#mkc-eb-kafka-prerequisites)
+ [設定 MSK Connect 所需的資源](#mkc-eb-kafka-set-up-resources)
+ [建立連接器](#mkc-eb-kafka-create-connector)
+ [傳送訊息至 Kafka](#mkc-eb-kafka-send-json-encoded-messages)

## 先決條件
<a name="mkc-eb-kafka-prerequisites"></a>

部署連接器之前，請確定您有下列資源：
+ **Amazon MSK 叢集**：用於產生和使用 Kafka 訊息的作用中 MSK 叢集。
+ **Amazon EventBridge 事件匯流排**：接收 Kafka 主題事件的 EventBridge 事件匯流排。
+ **IAM 角色**：建立具有 MSK Connect 和 EventBridge 連接器必要許可的 IAM 角色。
+ 從 MSK Connect 或 MSK 叢集的 [VPC 和子網路中建立的 EventBridge VPC 介面端點](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html)[存取公有網際網路](msk-connect-internet-access.md)。 EventBridge 這可協助您避免周遊公有網際網路，無需 NAT 閘道。
+ [用戶端機器](create-serverless-cluster-client.md)，例如 Amazon EC2 執行個體或 [AWS CloudShell](https://aws.amazon.com/cloudshell/)，用於建立主題並將記錄傳送至 Kafka。

## 設定 MSK Connect 所需的資源
<a name="mkc-eb-kafka-set-up-resources"></a>

您可以為連接器建立 IAM 角色，然後建立連接器。您也可以建立 EventBridge 規則來篩選傳送至 EventBridge 事件匯流排的 Kafka 事件。

**Topics**
+ [連接器的 IAM 角色](#mkc-eb-kafka-iam-role-connector)
+ [傳入事件的 EventBridge 規則](#mkc-eb-kafka-create-rule)

### 連接器的 IAM 角色
<a name="mkc-eb-kafka-iam-role-connector"></a>

您與連接器關聯的 IAM 角色必須具有 [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html) 許可，才能允許將事件傳送至 EventBridge。下列 IAM 政策範例會授予您許可，以將事件傳送至名為 的事件匯流排`example-event-bus`。請務必使用事件匯流排的 ARN 取代下列範例中的資源 ARN。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

此外，您必須確定連接器的 IAM 角色包含下列信任政策。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### 傳入事件的 EventBridge 規則
<a name="mkc-eb-kafka-create-rule"></a>

您可以建立符合傳入事件與事件資料條件的[規則](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)，稱為[https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)。使用事件模式，您可以定義篩選條件傳入事件的條件，並判斷哪些事件應觸發特定規則，然後路由至指定的[目標](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)。下列事件模式範例符合傳送至 EventBridge 事件匯流排的 Kafka 事件。

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

以下是使用 Kafka 接收器連接器從 Kafka 傳送至 EventBridge 的事件範例。

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

在 EventBridge 主控台中，使用此範例模式在事件匯流排上[建立規則](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html)，並指定目標，例如 CloudWatch Logs 群組。EventBridge 主控台會自動設定 CloudWatch Logs 群組的必要存取政策。

## 建立連接器
<a name="mkc-eb-kafka-create-connector"></a>

在下一節中，您會使用 建立和部署 [EventBridge Kafka 接收器連接器](https://github.com/awslabs/eventbridge-kafka-connector) AWS 管理主控台。

**Topics**
+ [步驟 1：下載連接器](#mkc-eb-kafka-download-connector)
+ [步驟 2：建立 Amazon S3 儲存貯體](#mkc-eb-kafka-s3-bucket-create)
+ [步驟 3：在 MSK Connect 中建立外掛程式](#mkc-eb-kafka-create-plugin)
+ [步驟 4：建立連接器](#mkc-eb-kafka-create-connector)

### 步驟 1：下載連接器
<a name="mkc-eb-kafka-download-connector"></a>

從 EventBridge Kafka 連接器的 [GitHub 版本頁面](https://github.com/awslabs/eventbridge-kafka-connector/releases)下載最新的 EventBridge 連接器接收器 JAR。例如，若要下載版本 v1.4.1，請選擇 JAR 檔案連結 `kafka-eventbridge-sink-with-dependencies.jar`來下載連接器。然後，將檔案儲存到機器上偏好的位置。

### 步驟 2：建立 Amazon S3 儲存貯體
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. 若要將 JAR 檔案存放在 Amazon S3 中以搭配 MSK Connect 使用，請開啟 AWS 管理主控台，然後選擇 Amazon S3。

1. 在 Amazon S3 主控台中，選擇**建立儲存貯**體，然後輸入唯一的儲存貯體名稱。例如 **amzn-s3-demo-bucket1-eb-connector**。

1. 為您的 Amazon S3 儲存貯體選擇適當的區域。請確定其與您的 MSK 叢集部署所在的區域相符。

1. 對於**儲存貯體設定**，保留預設選項或視需要調整。

1. 選擇**建立儲存貯體**。

1. 將 JAR 檔案上傳至 Amazon S3 儲存貯體。

### 步驟 3：在 MSK Connect 中建立外掛程式
<a name="mkc-eb-kafka-create-plugin"></a>

1. 開啟 AWS 管理主控台，然後導覽至 **MSK Connect**。

1. 在左側導覽窗格中，選擇**自訂外掛程式**。

1. 選擇**建立外掛程式**，然後輸入**外掛程式名稱**。例如 **eventbridge-sink-plugin**。

1. 針對**自訂外掛程式位置**，貼上 **S3 物件 URL**。

1. 新增外掛程式的選用描述。

1. 選擇**建立外掛程式**。

外掛程式建立後，您可以使用它在 MSK Connect 中設定和部署 EventBridge Kafka 連接器。

### 步驟 4：建立連接器
<a name="mkc-eb-kafka-create-connector"></a>

在建立連接器之前，建議您建立必要的 Kafka 主題，以避免連接器錯誤。若要建立 主題，請使用您的用戶端機器。

1. 在 MSK 主控台的左側窗格中，選擇**連接器**，然後選擇**建立連接器**。

1. 在外掛程式清單中，選擇 **eventbridge-sink-plugin**，然後選擇**下一步**。

1. 針對連接器名稱，輸入 **EventBridgeSink**。

1. 在叢集清單中，選擇您的 MSK 叢集。

1. <a name="connector-ex"></a>複製下列連接器組態，並將其貼到**連接器組態**欄位中

   視需要取代下列組態中的預留位置。
   + `aws.eventbridge.endpoint.uri` 如果您的 MSK 叢集具有公有網際網路存取，請移除 。
   + 如果您使用 PrivateLink 從 MSK 安全地連線至 EventBridge，請將 之後的 DNS 部分取代為您先前建立的 EventBridge 的 （選用） VPC 界面端點的`https://`正確私有 DNS 名稱。
   + 將下列組態中的 EventBridge 事件匯流排 ARN 取代為事件匯流排的 ARN。
   + 更新任何區域特定的值。

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   如需連接器組態的詳細資訊，請參閱 [eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector)。

   如有需要，請變更工作者和自動調整規模的設定。我們也建議使用下拉式清單中最新的可用 （建議） Apache Kafka Connect 版本。在**存取許可**下，使用先前建立的角色。我們也建議啟用 CloudWatch 的記錄功能，以提供可觀測性和故障診斷。根據您的需求調整其他選用設定，例如標籤。然後，部署連接器並等待狀態進入執行中狀態。

## 傳送訊息至 Kafka
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

您可以使用 指定不同的轉換器，並選擇性地指定 Kafka Connect 中可用的`key.converter`設定，以設定訊息編碼，例如 Apache Avro `value.converter`和 JSON。

本主題[connector example](#connector-ex)中的 設定為使用 JSON 編碼的訊息，如使用 `org.apache.kafka.connect.json.JsonConverter`的 所示`value converter`。當連接器處於執行中狀態時，請從用戶端機器傳送記錄至 `msk-eventbridge-tutorial` Kafka 主題。