

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# MSK Connect の EventBridge Kafka シンクコネクタを設定する
<a name="mkc-eventbridge-kafka-connector"></a>

このトピックでは、MSK Connect 用の [EventBridge Kafka シンクコネクタ](https://github.com/awslabs/eventbridge-kafka-connector)を設定する方法について説明します。このコネクタを使用すると、MSK クラスターから EventBridge [イベントバス](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html) にイベントを送信できます。このトピックでは、必要なリソースを作成し、Kafka と EventBridge 間のシームレスなデータフローを可能にするようにコネクタを設定するプロセスについて説明します。

**Topics**
+ [前提条件](#mkc-eb-kafka-prerequisites)
+ [MSK Connect に必要なリソースをセットアップする](#mkc-eb-kafka-set-up-resources)
+ [コネクタを作成する](#mkc-eb-kafka-create-connector)
+ [Kafka にメッセージを送信する](#mkc-eb-kafka-send-json-encoded-messages)

## 前提条件
<a name="mkc-eb-kafka-prerequisites"></a>

コネクタをデプロイする前に、次のリソースがあることを確認してください。
+ **Amazon MSK クラスター**: Kafka メッセージを生成および消費するためのアクティブな MSK クラスター。
+ **Amazon EventBridge イベントバス**: Kafka トピックからイベントを受信するための EventBridge イベントバス。
+ **IAM ロール**: MSK Connect と EventBridge コネクタに必要なアクセス許可を持つ IAM ロールを作成します。
+ MSK Connect または [VPC インターフェース エンドポイント](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html) から [パブリック インターネットへのアクセス](msk-connect-internet-access.md) (MSK クラスターの VPC およびサブネット内に作成された EventBridge 用)。これにより、NAT ゲートウェイを必要とせずにパブリックインターネットを経由することを回避できます。
+ Amazon EC2 インスタンスや [AWS CloudShell](https://aws.amazon.com/cloudshell/) などの [クライアントマシン](create-serverless-cluster-client.md) を使用して、トピックを作成し Kafka にレコードを送信します。

## MSK Connect に必要なリソースをセットアップする
<a name="mkc-eb-kafka-set-up-resources"></a>

コネクタの IAM ロールを作成してから、コネクタを作成します。また、EventBridge ルールを作成して、EventBridge イベントバスに送信された Kafka イベントをフィルタリングします。

**Topics**
+ [コネクタの IAM ロール](#mkc-eb-kafka-iam-role-connector)
+ [受信イベントの EventBridge ルール](#mkc-eb-kafka-create-rule)

### コネクタの IAM ロール
<a name="mkc-eb-kafka-iam-role-connector"></a>

コネクタに関連付ける IAM ロールには、EventBridge へのイベントの送信を許可する [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html) アクセス許可が必要です。次の IAM ポリシーの例では、`example-event-bus` という名前のイベントバスにイベントを送信する許可を付与します。次の例のリソース ARN をイベントバスの ARN に置き換えてください。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

さらに、コネクタの IAM ロールに次の信頼ポリシーが含まれていることを確認する必要があります。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### 受信イベントの EventBridge ルール
<a name="mkc-eb-kafka-create-rule"></a>

受信イベントを[https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)と呼ばれるイベントデータ基準と照合する[ルール](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)を作成します。イベントパターンを使用すると、受信イベントをフィルタリングする基準を定義し、特定のルールをトリガーし、その後指定された[ターゲット](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)にルーティングするイベントを決定できます。次のイベントパターンの例は、EventBridge イベントバスに送信された Kafka イベントと一致します。

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

以下は、Kafka シンクコネクタを使用して Kafka から EventBridge に送信されるイベントの例です。

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

EventBridge コンソールで、このパターン例を使用してイベントバスに[ルールを作成し](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html)、CloudWatch Logs グループなどのターゲットを指定します。EventBridge コンソールは、CloudWatch Logs グループに必要なアクセスポリシーを自動的に設定します。

## コネクタを作成する
<a name="mkc-eb-kafka-create-connector"></a>

次のセクションでは、 AWS マネジメントコンソールを使用して [EventBridge Kafka シンクコネクタ](https://github.com/awslabs/eventbridge-kafka-connector) を作成してデプロイします。

**Topics**
+ [ステップ 1: コレクタをダウンロードする](#mkc-eb-kafka-download-connector)
+ [ステップ 1: Amazon S3 バケットを作成する](#mkc-eb-kafka-s3-bucket-create)
+ [ステップ 3: MSK Connect でプラグインを作成する](#mkc-eb-kafka-create-plugin)
+ [ステップ 4: コネクタを作成する](#mkc-eb-kafka-create-connector)

### ステップ 1: コレクタをダウンロードする
<a name="mkc-eb-kafka-download-connector"></a>

EventBridge Kafka コネクタの [GitHub リリースページ](https://github.com/awslabs/eventbridge-kafka-connector/releases)から最新の EventBridge コネクタシンク JAR をダウンロードします。たとえば、バージョン v1.4.1 をダウンロードするには、JAR ファイルリンク `kafka-eventbridge-sink-with-dependencies.jar` を選択してコネクタをダウンロードします。次に、ファイルをマシン上の任意の場所に保存します。

### ステップ 1: Amazon S3 バケットを作成する
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. MSK Connect で使用する JAR ファイルを Amazon S3 に保存するには、 を開き AWS マネジメントコンソール、Amazon S3 を選択します。

1. Amazon S3 コンソールで、**バケットの作成**を選択し、一意のバケット名を入力します。例えば、**amzn-s3-demo-bucket1-eb-connector**。

1. Amazon S3 バケットに適したリージョンを選択します。MSK クラスターがデプロイされているリージョンと一致することを確認してください。

1. **バケット設定**では、デフォルトの選択を保持するか、必要に応じて調整します。

1. [**バケットを作成**] を選択します。

1. JAR ファイルを Amazon S3 バケットにアップロードします。

### ステップ 3: MSK Connect でプラグインを作成する
<a name="mkc-eb-kafka-create-plugin"></a>

1. を開き AWS マネジメントコンソール、**MSK Connect** に移動します。

1. 左側のナビゲーションペインで **[カスタムプラグイン]** を選択します。

1. **プラグインの作成**を選択し、**プラグイン名**を入力します。例えば、**eventbridge-sink-plugin**。

1. **カスタムプラグインの場所**の場合は、**S3 オブジェクト URL **を貼り付けます。

1. プラグインのオプションの説明を追加します。

1. **プラグインの作成** を選択します。

プラグインを作成したら、それを使用して MSK Connect で EventBridge Kafka コネクタを設定およびデプロイできます。

### ステップ 4: コネクタを作成する
<a name="mkc-eb-kafka-create-connector"></a>

コネクタを作成する前に、コネクタエラーを避けるために必要な Kafka トピックを作成することをお勧めします。トピックを作成するには、クライアントマシンを使用します。

1. MSK コンソールの左側のペインで[**コネクタ**] を選択し、[**コネクタの作成**]を選択します。

1. プラグインのリストで、**eventbridge-sink-plugin** を選択し、**Next** を選択します。

1. コネクタ名には、 **EventBridgeSink** と入力します。

1. クラスターリストから、 お客様の MSK クラスターを選択します。

1. <a name="connector-ex"></a>以下のコネクタ設定をコピーし、**コネクタ設定**フィールドに貼り付けてください

   必要に応じて、次の設定のプレースホルダーを置き換えます。
   + MSK クラスターにパブリックインターネットアクセスがある場合は、`aws.eventbridge.endpoint.uri` を削除します。
   + PrivateLink を使用して MSK から EventBridge に安全に接続する場合は、`https://` の後の DNS 部分を、前に作成した EventBridge の (オプション) VPC インターフェイスエンドポイントの正しいプライベート DNS 名に置き換えます。
   + 次の設定の EventBridge イベントバス ARN をイベントバスの ARN に置き換えます。
   + リージョン固有の値を更新します。

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   コネクタ設定の詳細については、「[eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector)」を参照してください。

   必要に応じて、ワーカーと自動スケーリングの設定を変更します。また、ドロップダウンから利用可能な最新の (推奨) Apache Kafka Connect バージョンを使用することをお勧めします。**アクセス許可**で、前に作成したロールを使用します。また、オブザーバビリティとトラブルシューティングのために CloudWatch へのログ記録を有効にすることをお勧めします。必要に応じて、タグなどの他のオプション設定を調整します。次に、コネクタをデプロイし、ステータスが実行中状態になるまで待ちます。

## Kafka にメッセージを送信する
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

Apache Avro や JSON などのメッセージエンコーディングを設定するには、 `value.converter` と、オプションで Kafka Connect で使用できる`key.converter`設定を使用して、さまざまなコンバーターを指定します。

このトピックの [connector example](#connector-ex) は、`org.apache.kafka.connect.json.JsonConverter` の使用で示されているように、`value converter` 用の JSON エンコードされたメッセージで動作するように設定されています。コネクタが実行中状態になったら、クライアントマシンから `msk-eventbridge-tutorial` Kafka トピックにレコードを送信します。