

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 为 MSK Connect 设置 EventBridge Kafka 水槽连接器
<a name="mkc-eventbridge-kafka-connector"></a>

本主题向您展示如何为 MSK Connect 设置 [EventBridge Kafka 接收器连接器](https://github.com/awslabs/eventbridge-kafka-connector)。此连接器允许您将事件从 MSK 集群发送到 EventBridge [事件总线](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html)。本主题介绍创建所需资源和配置连接器以实现 Kafka 和 EventBridge之间的无缝数据流的过程。

**Topics**
+ [先决条件](#mkc-eb-kafka-prerequisites)
+ [设置 MSK Connect 所需的资源](#mkc-eb-kafka-set-up-resources)
+ [创建连接器](#mkc-eb-kafka-create-connector)
+ [向 Kafka 发送消息](#mkc-eb-kafka-send-json-encoded-messages)

## 先决条件
<a name="mkc-eb-kafka-prerequisites"></a>

部署连接器之前，请确保拥有以下资源：
+ **Amazon MSK 集群**：用于生成和使用 Kafka 消息的活动 MSK 集群。
+ **Amazon EventBridge 活动总线**：用于接收来自 Kafka 主题的事件的事件的活动总线。 EventBridge 
+ **IAM 角色**：创建具有 MSK Connect 和连接 EventBridge 器所需权限的 IAM 角色。
+ 通过 MSK Connect 或在 MSK 集群的 [VPC 和子网中 EventBridge 创建的 VPC 接口终端节点](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html)[访问公共互联](msk-connect-internet-access.md)网。这有助于避免直接穿越公共互联网，而不需要 NAT 网关。
+ [客户端计算机](create-serverless-cluster-client.md)，例如 Amazon EC2 实例或 [AWS CloudShell](https://aws.amazon.com/cloudshell/)，用于创建主题并向 Kafka 发送记录。

## 设置 MSK Connect 所需的资源
<a name="mkc-eb-kafka-set-up-resources"></a>

为连接器创建 IAM 角色，然后创建连接器。您还可以创建 EventBridge 规则来筛选发送到事件总线的 Kafka EventBridge 事件。

**Topics**
+ [连接器的 IAM 角色](#mkc-eb-kafka-iam-role-connector)
+ [传入事件的 EventBridge 规则](#mkc-eb-kafka-create-rule)

### 连接器的 IAM 角色
<a name="mkc-eb-kafka-iam-role-connector"></a>

您与连接器关联的 IAM 角色必须具有允许向其发送事件的[PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html)权限 EventBridge。以下 IAM 策略示例授予了向名为 `example-event-bus` 的事件总线发送事件的权限。确保将以下示例中的资源 ARN 替换为事件总线的 ARN。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

此外，还必须确保连接器的 IAM 角色包含以下信任策略。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### 传入事件的 EventBridge 规则
<a name="mkc-eb-kafka-create-rule"></a>

可以创建使传入事件与事件数据标准（称为[https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)）相匹配的[规则](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)。使用事件模式，可以定义传入事件筛选标准，并确定哪些事件应触发特定规则，然后将事件路由到指定[目标](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)。以下事件模式示例与发送到事件总线的 Kafka 事件相匹配。 EventBridge 

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

以下是 EventBridge 使用 Kafka 接收器连接器从 Kafka 发送到的事件的示例。

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

在 EventBridge 控制台中，使用此示例模式在事件总线上[创建规则](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html)并指定目标，例如 CloudWatch 日志组。 EventBridge 控制台将自动为 CloudWatch 日志组配置必要的访问策略。

## 创建连接器
<a name="mkc-eb-kafka-create-connector"></a>

在下一节中，您将使用创建和部署 [EventBridge Kafka 接收器连接器](https://github.com/awslabs/eventbridge-kafka-connector)。 AWS 管理控制台

**Topics**
+ [步骤 1：下载连接器](#mkc-eb-kafka-download-connector)
+ [步骤 2：创建 Amazon S3 存储桶](#mkc-eb-kafka-s3-bucket-create)
+ [步骤 3：在 MSK Connect 中创建插件](#mkc-eb-kafka-create-plugin)
+ [步骤 4：创建连接器](#mkc-eb-kafka-create-connector)

### 步骤 1：下载连接器
<a name="mkc-eb-kafka-download-connector"></a>

从 Ka EventBridge fka EventBridge 连接器的[GitHub 版本页面](https://github.com/awslabs/eventbridge-kafka-connector/releases)下载最新的连接器接收器 JAR。例如，要下载版本 v1.4.1，请选择 JAR 文件链接 `kafka-eventbridge-sink-with-dependencies.jar` 以下载连接器。然后，将文件保存到计算机上的首选位置。

### 步骤 2：创建 Amazon S3 存储桶
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. 要将 JAR 文件存储在 Amazon S3 中以用于 MSK Connect，请打开 AWS 管理控制台，然后选择 Amazon S3。

1. 在 Amazon S3 控制台中，选择**创建存储桶**，输入唯一存储桶名称。例如 **amzn-s3-demo-bucket1-eb-connector**。

1. 为 Amazon S3 存储桶选择合适的区域。确保与部署 MSK 集群的区域相匹配。

1. 对于**存储桶设置**，请保留默认选择或根据需要调整。

1. 选择**创建存储桶**。

1. 将 JAR 文件上传到 Amazon S3 存储桶中。

### 步骤 3：在 MSK Connect 中创建插件
<a name="mkc-eb-kafka-create-plugin"></a>

1. 打开 AWS 管理控制台，然后导航到 **MSK Connect**。

1. 在左侧导航窗格中，选择**自定义插件**。

1. 选择**创建插件**，然后输入**插件名称**。例如 **eventbridge-sink-plugin**。

1. 对于**自定义插件位置**，请粘贴 **S3 对象 URL**。

1. 为插件添加可选描述。

1. 选择**创建插件**。

创建插件后，您可以使用它在 MSK Connect 中配置和部署 EventBridge Kafka 连接器。

### 步骤 4：创建连接器
<a name="mkc-eb-kafka-create-connector"></a>

在创建连接器之前，建议创建所需的 Kafka 主题以避免连接器错误。要创建主题，请使用客户端计算机。

1. 在 MSK 控制台的左侧窗格中，选择**连接器**，然后选择**创建连接器**。

1. 在插件列表中，选择 **eventbridge-sink-plugin**，然后选择**下一步**。

1. 对于连接器名称，请输入 **EventBridgeSink**。

1. 在集群列表中，选择 MSK 集群。

1. <a name="connector-ex"></a>复制以下连接器配置，并将其粘贴到**连接器配置**字段中

   根据需要替换以下配置中的占位符。
   + 如果 MSK 集群可访问公共互联网，请将 `aws.eventbridge.endpoint.uri` 删除。
   + 如果您使用 PrivateLink 安全地从 MSK 连接到 EventBridge，请将后`https://`面的 DNS 部分替换为您之前创建的（可选）VPC 接口终端节点的 EventBridge 正确私有 DNS 名称。
   + 将以下配置中的 EventBridge 事件总线 ARN 替换为事件总线的 ARN。
   + 更新任何特定区域的值。

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   有关连接器配置的更多信息，请参见[eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector)。

   如果需要，请更改工作程序和自动缩放的设置。我们还建议从下拉列表中使用最新可用（推荐）的 Apache Kafka Connect 版本。在**访问权限**下，使用之前创建的角色。我们还建议启用日志功能，以实现可 CloudWatch 观察性和故障排除。根据需要调整其他可选设置，例如标签。然后，部署连接器并等待其状态进入运行状态。

## 向 Kafka 发送消息
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

可使用 Kafka Connect 中提供的 `value.converter` 设置，也可以选择使用 `key.converter` 设置来指定不同的转换器，从而配置消息编码，例如 Apache Avro 和 JSON。

如使用 `org.apache.kafka.connect.json.JsonConverter` 作为 `value converter` 所示，本主题中的 [connector example](#connector-ex) 配置为使用 JSON 编码的消息。当连接器处于“运行”状态时，从客户端计算机向 `msk-eventbridge-tutorial` Kafka 主题发送记录。