

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Debezium ソースコネクタ (設定プロバイダー付き) を使用する
<a name="mkc-debeziumsource-connector-example"></a>

この例は、MySQL 互換の [Amazon Aurora](https://aws.amazon.com/rds/aurora/) データベースをソースとして Debezium MySQL コネクタプラグインを使用する方法を示しています。この例では、 AWS Secrets Managerのデータベースの認証情報を外部化するために、オープンソースの [AWS Secrets Manager Config プロバイダー](https://github.com/jcustenborder/kafka-config-provider-aws)も設定しています。設定プロバイダーの詳細については、「[チュートリアル: 設定プロバイダーを用いた機密情報の外部化](msk-connect-config-provider.md)」を参照してください。

**重要**  
Debezium MySQL コネクタプラグインは [1 つのタスクのみをサポート](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max)し、Amazon MSK Connect の自動スケーリングキャパシティモードでは動作しません。代わりにプロビジョニングキャパシティモードを使用し、`workerCount` をコネクタ設定の値と等しい値に設定してください。MSK Connect のキャパシティモードの詳細については、「[コネクタ容量を理解する](msk-connect-capacity.md)」を参照してください。

# Debezium ソースコネクタを使用するための完全な前提条件
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

コネクタは、 の外部 AWS Secrets Manager にある などのサービスとやり取りできるように、インターネットにアクセスできる必要があります Amazon Virtual Private Cloud。このセクションの手順は、インターネットアクセスを有効にするための次のタスクを実行するのに役立ちます。
+ NAT ゲートウェイをホストし、VPC 内のインターネットゲートウェイにトラフィックをルーティングするパブリックサブネットを設定します。
+ プライベートサブネットのトラフィックを NAT ゲートウェイに送るデフォルトルートを作成します。

詳細については、「[Amazon MSK Connect のインターネットアクセスを有効にする](msk-connect-internet-access.md)」を参照してください。

**前提条件**

インターネットアクセスを有効にするには、以下のものが必要です。
+ クラスターに関連付けられている Amazon Virtual Private Cloud (VPC) の ID。例えば、*vpc-123456ab* などです。
+ VPC 内のプライベートサブネットの ID。例えば、*subnet-a1b2c3de*、*subnet-f4g5h6ij* などです。コネクタにはプライベートサブネットを設定する必要があります。

**コネクタのインターネットアクセスを有効にするには**

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) で Amazon Virtual Private Cloud コンソールを開きます。

1. わかりやすい名前を付けて NAT ゲートウェイのパブリックサブネットを作成し、サブネット ID を書き留めます。詳細な手順については、「[VPC にサブネットを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)」を参照してください。

1. VPC がインターネットと通信できるようにインターネットゲートウェイを作成し、ゲートウェイ ID を書き留めます。VPC にインターネットゲートウェイをアタッチします。手順については、「[インターネットゲートウェイの作成とアタッチ](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)」を参照してください。

1. プライベートサブネット内のホストがパブリックサブネットにアクセスできるように、パブリック NAT ゲートウェイをプロビジョニングします。NAT ゲートウェイを作成するときに、前に作成したパブリックサブネットを選択します。手順については、「[NAT ゲートウェイの作成](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)」を参照してください。

1. ルートテーブルを設定します。この設定を完了するには、合計で 2 つのルートテーブルが必要です。VPC と同時に自動的に作成されたメインのルートテーブルが既にあるはずです。このステップでは、パブリックサブネット用の追加のルートテーブルを作成します。

   1. 次の設定を使用して VPC のメインルートテーブルを変更し、プライベートサブネットがトラフィックを NAT ゲートウェイにルーティングするようにします。手順については、*Amazon Virtual Private Cloud**ユーザーガイド*の[ルートテーブルの操作](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) を参照してください。  
**プライベート MSKC ルートテーブル**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. 「[カスタムルートテーブルを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)」の手順に従って、パブリックサブネットのルートテーブルを作成します。テーブルを作成するときは、そのテーブルがどのサブネットに関連付けられているかを識別しやすいように、**[名前タグ]** フィールドにわかりやすい名前を入力します。例えば、**パブリック MSKC** と入力します。

   1. 以下の設定を使用して**パブリック MSKC** のルートテーブルを設定します。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

Amazon MSK Connect のインターネットアクセスが有効になり、コネクタを作成する準備が整いました。

# Debezium ソースコネクタを作成する
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

この手順では、Debezium ソースコネクタを作成する方法について説明します。

1. 

**カスタムプラグインを作成する**

   1. [Debezium](https://debezium.io/releases/) サイトから最新の安定版リリース用の MySQL コネクタプラグインをダウンロードしてください。ダウンロードした Debezium リリースバージョン (バージョン 2.x、または古いシリーズ 1.x) を書き留めます。この手順の後半で、Debezium のバージョンに基づいてコネクタを作成します。

   1. [AWS Secrets Manager 設定プロバイダー](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws)をダウンロードして解凍します。

   1. 以下のアーカイブを同じディレクトリに置きます。
      + `debezium-connector-mysql` フォルダ
      + `jcusten-border-kafka-config-provider-aws-0.1.1` フォルダ

   1. 前のステップで作成したディレクトリを ZIP ファイルに圧縮し、その ZIP ファイルを S3 バケットにアップロードします。手順については、*Amazon S3 ユーザーガイド*の[オブジェクトのアップロード](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) を参照してください。

   1. 次の JSON をコピーして、ファイルに貼り付けます。例えば、`debezium-source-custom-plugin.json`。*<example-custom-plugin-name>* をプラグインに付けたい名前に置き換え、*<amzn-s3-demo-bucket-arn>* を ZIP ファイルをアップロードした Amazon S3 バケットの ARN に置き換え、 `<file-key-of-ZIP-object>` を S3 にアップロードした ZIP オブジェクトのファイルキーに置き換えてください。

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. JSON ファイルを保存したフォルダから次の AWS CLI コマンドを実行して、プラグインを作成します。

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      以下のような出力が表示されます。

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. 次のコマンドを実行して、プラグインの状態を確認します。状態は `CREATING` から `ACTIVE` に変わります。ARN プレースホルダーを前のコマンドの出力で取得した ARN に置き換えます。

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**データベース認証情報のシークレットを設定 AWS Secrets Manager および作成する**

   1. [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/) から Secrets Manager コンソールを開きます。

   1. データベースのサインイン認証情報を保存する新しいシークレットを作成します。手順については、*AWS Secrets Manager* ユーザーガイドの[シークレットを作成する](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html)を参照してください。

   1. シークレットの ARN をコピーします。

   1. 以下のサンプルポリシーの Secrets Manager のアクセス許可を [サービス実行ロールを理解する](msk-connect-service-execution-role.md) に追加します。*<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>* をシークレットの ARN で置き換えます。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      IAM のアクセス許可を追加する手順については、「*IAM ユーザーガイド*」の「[IAM ID のアクセス許可の追加と削除](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)」を参照してください。

1. 

**設定プロバイダーに関する情報を使用してカスタムワーカー設定を作成します。**

   1. 次のワーカー設定プロパティをファイルにコピーして、プレースホルダー文字列をシナリオに対応する値に置き換えます。 AWS Secrets Manager 設定プロバイダーの設定プロパティの詳細については、プラグインのドキュメントの「[SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html)」を参照してください。

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. 次の AWS CLI コマンドを実行して、カスタムワーカー設定を作成します。

      以下の値を置き換えます:
      + *<my-worker-config-name>* - カスタムワーカー設定のわかりやすい名前
      + *<encoded-properties-file-content-string>* - 前のステップでコピーしたプレーンテキストプロパティの base64 でエンコードされたバージョン

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**コネクタを作成する**

   1. Debezium のバージョン (2.x または 1.x) に対応する次の JSON をコピーして、新しいファイルに貼り付けます。`<placeholder>` 文字列をシナリオに対応する値に置き換えます。サービス実行ロールの設定方法については、「[MSK Connect の IAM のロールとポリシー](msk-connect-iam.md)」を参照してください。

      この設定では、データベースの認証情報を指定するのにプレーンテキストではなく `${secretManager:MySecret-1234:dbusername}` のような変数を使用していることに注意してください。`MySecret-1234` をシークレットの名前に置き換えてから、取得したいキーの名前を入力します。また、`<arn-of-config-provider-worker-configuration>` をカスタムワーカー設定の ARN に置き換える必要があります。

------
#### [ Debezium 2.x ]

      Debezium 2.x バージョンでは、次の JSON をコピーして、新しいファイルに貼り付けます。*<placeholder>* 文字列をシナリオに対応する値に置き換えます。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      Debezium 1.x バージョンでは、次の JSON をコピーして、新しいファイルに貼り付けます。*<placeholder>* 文字列をシナリオに対応する値に置き換えます。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. 前のステップで JSON ファイルを保存したフォルダで、次の AWS CLI コマンドを実行します。

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      以下は、コマンドを正常に実行したときに得られる出力の例です。

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# Debezium コネクタ設定を更新する
<a name="mkc-debeziumsource-connector-update"></a>

Debezium コネクタの設定を更新するには、次の手順に従います。

1. 次の JSON をコピーし、新しいファイルに貼り付けます。`<placeholder>` 文字列をシナリオに対応する値に置き換えます。

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. 前のステップで JSON ファイルを保存したフォルダで、次の AWS CLI コマンドを実行します。

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   コマンドを正常に実行したときの出力の例を次に示します。

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. 次のコマンドを実行して、オペレーションの現在の状態をモニタリングできるようになりました。

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

詳細なステップを含む Debezium コネクタの例については、「[Amazon MSK Connect の紹介 – マネージドコネクタを使用した Apache Kafka クラスターとの間のデータのストリーミング](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/)」を参照してください。