

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# のターゲットとしての Apache Kafka の使用 AWS Database Migration Service
<a name="CHAP_Target.Kafka"></a>

を使用して AWS DMS 、Apache Kafka クラスターにデータを移行できます。Apache Kafka は分散ストリーミング プラットフォームです。Apache Kafka を使用すると、ストリーミング　データをリアルタイムで取り込み、処理できます。

AWS は、 AWS DMS ターゲットとして使用する Amazon Managed Streaming for Apache Kafka (Amazon MSK) も提供します。Amazon MSK は、Apache Kafka インスタンスの実装と管理を簡素化する、フルマネージド型 Apache Kafka ストリーミング サービスです。オープンソースの Apache Kafka バージョンで動作し、Apache Kafka インスタンスとまったく同じ AWS DMS ターゲットとして Amazon MSK インスタンスにアクセスします。詳細については、*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*の「[Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)」をご参照ください。

Kafka クラスターは、パーティションに分割されたトピックと呼ばれるカテゴリにレコードのストリームを保存します。*パーティション*は、トピック内のデータレコード (メッセージ) の一意に識別されたシーケンスです。パーティションは、トピックレコードの並列処理を可能にするために、クラスター内の複数のブローカーに分散することができます。トピックとパーティション、および Apache Kafka での分散の詳細については、「[トピックとログ](https://kafka.apache.org/documentation/#intro_topics)」と「[分散](https://kafka.apache.org/documentation/#intro_distribution)」をご参照ください。

Kafka クラスターは、Amazon MSK インスタンス、Amazon EC2 インスタンス上で実行されるクラスター、またはオンプレミスのクラスターのいずれかです。Amazon MSK インスタンスまたは Amazon EC2 インスタンス上のクラスターは、同じ VPC 内にも、別の VPC 内にも配置できます。クラスターがオンプレミスの場合は、レプリケーションインスタンスに独自のオンプレミスのネーム サーバーを使用して、クラスターのホスト名を解決できます。レプリケーションインスタンスのネームサーバーのセットアップについては、「[独自のオンプレミスネームサーバーの使用](CHAP_BestPractices.md#CHAP_BestPractices.Rte53DNSResolver)」を参照してください。ネットワークの設定の詳細については、「[レプリケーション インスタンスのためのネットワークのセットアップ](CHAP_ReplicationInstance.VPC.md)」を参照してください。

Amazon MSK クラスターを使用する場合、セキュリティグループがレプリケーションインスタンスからのアクセスを許可していることを確認します。Amazon MSK クラスターのセキュリティグループの変更については、「[Amazon MSK クラスターのセキュリティグループの変更](https://docs.aws.amazon.com/msk/latest/developerguide/change-security-group.html)」を参照してください。

AWS Database Migration Service は、JSON を使用して Kafka トピックにレコードを発行します。変換時、 AWS DMS はソースデータベースからの各レコードを JSON フォーマットの属性と値のペアにシリアル化します。

サポートされている任意のデータソースから、ターゲット Kafka クラスターにデータを移行するために、オブジェクトのマッピングを使用します。オブジェクトマッピングを使用して、ターゲットトピックにデータ レコードを構築する方法を決定します。データをそのパーティションにグループ化するために Apache Kafka で使用する、各テーブルのパーティションキーも定義します。

現在、 はタスクごとに 1 つのトピック AWS DMS をサポートしています。単一のタスクに複数のテーブルがある場合、すべてのメッセージが単一のトピックに送信されます。各メッセージには、ターゲットスキーマと table. AWS DMS versions 3.4.6 以降を識別するメタデータセクションが含まれており、オブジェクトマッピングを使用したマルチトピックレプリケーションをサポートしています。詳細については、「[オブジェクトマッピングを使用したマルチトピックレプリケーション](#CHAP_Target.Kafka.MultiTopic)」を参照してください。

**Apache Kafka のエンドポイント設定**

 AWS DMS コンソールのエンドポイント設定、または CLI の `--kafka-settings`オプションを使用して、接続の詳細を指定できます。各設定の要件は次のとおりです。
+ `Broker` — Kafka クラスター内の 1 つ以上のブローカーの場所を、`{{broker-hostname}}:{{port}}` のカンマ区切りリストの形式で指定します。例: `"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"`。この設定では、クラスター内の任意またはすべてのブローカーのロケーションを指定できます。クラスターブローカーはすべて、トピックに移行されたデータレコードのパーティション化を処理するために通信します。
+ `Topic` - (オプション) 最大 255 文字および記号のトピック名を指定します。ピリオド (.)、アンダースコア (\_)、マイナス (-) を使用できます。ピリオド (.) またはアンダースコア (\_) があるトピック名は、内部データ構造内で衝突する可能性があります。トピック名には、どちらか一方を使用し、両方とも使用することは避けてください。トピック名を指定しない場合、 AWS DMS を移行トピック`"kafka-default-topic"`として使用します。
**注記**  
で指定した移行トピックまたはデフォルトのトピック AWS DMS を作成するには、Kafka クラスター設定`auto.create.topics.enable = true`の一部として を設定します。詳細については、[のターゲットとして Apache Kafka を使用する場合の制限 AWS Database Migration Service](#CHAP_Target.Kafka.Limitations)を参照してください。
+ `MessageFormat` - エンドポイントで作成されたレコードの出力形式。メッセージ形式は `JSON` (デフォルト) または `JSON_UNFORMATTED` (タブなし 1 行) です。
+ `MessageMaxBytes` — エンドポイントで作成されたレコードの最大サイズ (バイト単位)。デフォルトは 1,000,000 です。
**注記**  
CLI/SDK AWS のみを使用して、デフォルト以外の値`MessageMaxBytes`に変更できます。例えば、既存の Kafka エンドポイントを変更して `MessageMaxBytes` を変更するには、以下のコマンドを使用します。  

  ```
  aws dms modify-endpoint --endpoint-arn {{your-endpoint}} 
  --kafka-settings Broker="{{broker1-server}}:{{broker1-port}},{{broker2-server}}:{{broker2-port}},...",
  Topic={{topic-name}},MessageMaxBytes={{integer-of-max-message-size-in-bytes}}
  ```
+ `IncludeTransactionDetails` - ソースデータベースからの詳細のトランザクション情報を提供します。この情報には、コミットタイムスタンプ、ログの位置、`transaction_id`、`previous_transaction_id`、および `transaction_record_id` (トランザクション内のレコードオフセット) の値が含まれます。デフォルトは `false` です。
+ `IncludePartitionValue` パーティションタイプが でない限り、Kafka メッセージ出力内のパーティション値を表示します。`schema-table-type`デフォルトは `false` です。
+ `PartitionIncludeSchemaTable` パーティションタイプが `primary-key-type` の場合、スキーマとテーブル名をパーティション値にプレフィックスします。これにより、Kafka パーティション間のデータ分散が増加します。例えば、`SysBench` スキーマに数千のテーブルがあり、各テーブルのプライマリ キーの範囲が制限されているとします。この場合、同じプライマリキーが数千のテーブルから同じパーティションに送信され、スロットリングが発生します。デフォルトは `false` です。
+ `IncludeTableAlterOperations` – `rename-table`、`drop-table`、`add-column`、`drop-column`、`rename-column` など、制御データのテーブルを変更するデータ定義言語 (DDL) オペレーションが含まれます。デフォルトは `false` です。
+ `IncludeControlDetails` - Kafka メッセージ出力に、テーブル定義、列定義、テーブルおよび列の変更の詳細な制御情報を表示します。デフォルトは `false` です。
+ `IncludeNullAndEmpty` — ターゲットに NULL 列と空の列を含めます。デフォルトは `false` です。
+ `SecurityProtocol` — Transport Layer Security (TLS) を使用して Kafka ターゲット エンドポイントへの安全な接続を設定します。オプションには `ssl-authentication`、`ssl-encryption`、および `sasl-ssl` があります。`sasl-ssl` を使用して `SaslUsername` と `SaslPassword` を要求します。
+ `SslEndpointIdentificationAlgorithm` - 証明書のホスト名検証を設定します。この設定は、 AWS DMS バージョン 3.5.1 以降でサポートされています。オプションは以下のとおりです。
  + `NONE`: クライアント接続でブローカーのホスト名検証を無効にします。
  + `HTTPS`: クライアント接続でブローカーのホスト名検証を有効にします。
+ `useLargeIntegerValue` - AWS DMS バージョン 3.5.4 で利用可能な倍精度で入力をキャストする代わりに、最大 18 桁の整数を使用します。デフォルトは False です。

設定を使用すると、転送速度を上げることができます。これを行うために、 AWS DMS は Apache Kafka ターゲット　クラスターへのマルチスレッド全ロードをサポートしています。 AWS DMS は、次のようなタスク設定を使用して、このマルチスレッドをサポートします：
+ `MaxFullLoadSubTasks` – このオプションを使用して、並列でロードするソーステーブルの最大数を指定します。 は、専用サブタスクを使用して、各テーブルを対応する Kafka ターゲットテーブルに AWS DMS ロードします。デフォルトは 8、最大値は 49 です。
+ `ParallelLoadThreads` – このオプションを使用して、 AWS DMS が各テーブルを Kafka ターゲットテーブルにロードするために使用するスレッドの数を指定します。Apache Kafka ターゲットの最大値は 32 です。この上限を増やすよう依頼できます。
+ `ParallelLoadBufferSize` - Kafka ターゲットにデータをロードするために並列ロードスレッドが使用する、バッファ内に保存するレコードの最大数を指定するには、このオプションを使用します。デフォルト値は 50 です。最大値は 1000 です。この設定は `ParallelLoadThreads` で使用します。`ParallelLoadBufferSize` は、複数のスレッドがある場合にのみ有効です。
+ `ParallelLoadQueuesPerThread` - このオプションを使用して、各同時スレッドがキューからデータレコードを取り出し、ターゲットのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは 1 です。最大数は 512。

Kafka エンドポイントの変更データキャプチャ (CDC) のパフォーマンスを向上するには、並列スレッドと一括オペレーションのタスク設定を調整します。これを行うには、`ParallelApply*` タスク設定を使用して、同時スレッドの数、スレッドあたりのキュー数、バッファに格納するレコード数を指定します。例えば、CDC ロードを実行し、128 本のスレッドを並列に適用するとします。また、スレッドあたり 64 個のキューにアクセスして、バッファあたり 50 個のレコードを保存する必要があります。

CDC パフォーマンスを向上させるために、 は次のタスク設定 AWS DMS をサポートしています。
+ `ParallelApplyThreads` – CDC ロード中に AWS DMS がデータレコードを Kafka ターゲットエンドポイントにプッシュするために使用する同時スレッドの数を指定します。デフォルト値は 0 で、最大値は 32 です。
+ `ParallelApplyBufferSize` - CDC ロード中に同時スレッドが Kafka ターゲット エンドポイントにプッシュする場合に、各バッファキューに保存するレコードの最大数を指定します。デフォルト値は 100 で、最大値は 1,000 です。このオプションは、`ParallelApplyThreads` で複数のスレッドを指定する場合に使用します。
+ `ParallelApplyQueuesPerThread` - 各スレッドがキューからデータレコードを取り出し、CDC 中に Kafka エンドポイントのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは 1 です。最大数は 512。

`ParallelApply*` タスク設定を使用する場合、`partition-key-type` のデフォルトは `schema-name.table-name` ではなくテーブルの `primary-key` です。

## Transport Layer Security (TLS) を使用した Kafka への接続
<a name="CHAP_Target.Kafka.TLS"></a>

このクラスターは Transport Layer Security (TLS) を使用した安全な接続のみを受け入れます。DMS では、次の 3 つのセキュリティプロトコルオプションのいずれかを使用して、Kafka エンドポイント接続をセキュリティで保護できます。

**SSL 暗号化 (`server-encryption`)**  
クライアントは、サーバーの証明書を使用してサーバー ID を検証します。次に、サーバーとクライアント間で暗号化された接続が確立されます。

**SSL 認証 (`mutual-authentication`)**  
サーバーとクライアントは、独自の証明書を使用して ID を相互に検証します。次に、サーバーとクライアント間で暗号化された接続が確立されます。

**SASL-SSL (`mutual-authentication`)**  
簡易認証およびセキュリティ レイヤー (SASL) メソッドは、クライアントの証明書をユーザー名とパスワードに置き換えて、クライアント ID を検証します。具体的には、サーバーがクライアントの ID を検証できるように、サーバーが登録したユーザー名とパスワードを指定します。次に、サーバーとクライアント間で暗号化された接続が確立されます。

**重要**  
Apache Kafka と Amazon MSK は解決済みの証明書を受け入れます。これは、Kafka と Amazon MSK が対処すべき既知の制限です。詳細については、「[Apache Kafka の問題、KAFKA-3700](https://issues.apache.org/jira/browse/KAFKA-3700)」をご参照ください。  
Amazon MSK を使用している場合は、この既知の制限の回避策としてアクセスコントロールリスト (ACL) を使用することを検討してください。ACL の使用の詳細については、*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*の [Apache Kafka ACL](https://docs.aws.amazon.com//msk/latest/developerguide/msk-acls.html) セクションをご参照ください。  
自己管理 Kafka クラスターを使用している場合クラスターの設定の詳細については、「[2018/10/21日付のコメント](https://issues.apache.org/jira/browse/KAFKA-3700?focusedCommentId=16658376)」をご参照ください。

### Amazon MSK または自己管理 Kafka クラスターでの SSL 暗号化の使用
<a name="CHAP_Target.Kafka.TLS.SSLencryption"></a>

SSL 暗号化を使用して、Amazon MSK または自己管理 Kafka クラスターへのエンドポイント接続をセキュリティで保護できます。SSL 暗号化認証方法を使用する場合、クライアントはサーバーの証明書を使用してサーバーの ID を検証します。次に、サーバーとクライアント間で暗号化された接続が確立されます。

**SSL 暗号化を使用して Amazon MSK に接続するには**
+ ターゲットの Kafka エンドポイントを作成するとき、`ssl-encryption` オプションを使うセキュリティ プロトコル エンドポイントの設定 (`SecurityProtocol`) を行います。

  次の JSON 例では、セキュリティプロトコルを SSL 暗号化として設定しています。

```
"KafkaSettings": {
    "SecurityProtocol": "ssl-encryption", 
}
```

**自己管理 Kafka クラスターに SSL 暗号化を使用するには**

1. オンプレミス Kafka クラスターでPrivate Certificate Authority (CA) を使用している場合は、プライベート CA 証明書をアップロードして Amazon リソースネーム (ARN) を取得します。

1. ターゲットの Kafka エンドポイントを作成するとき、`ssl-encryption` オプションを使うセキュリティ プロトコル エンドポイントの設定 (`SecurityProtocol`) を行います。次の JSON の例では、セキュリティプロトコルを `ssl-encryption` として設定します。

   ```
   "KafkaSettings": {
       "SecurityProtocol": "ssl-encryption", 
   }
   ```

1. プライベート CA を使用している場合は、上記の最初のステップで取得した ARN に `SslCaCertificateArn` を設定します。

### SSL 認証の使用
<a name="CHAP_Target.Kafka.TLS.SSLauthentication"></a>

SSL 認証を使用して、Amazon MSK または自己管理 Kafka クラスターへのエンドポイント接続をセキュリティで保護できます。

SSL 認証を使用したクライアント認証と暗号化を有効にして Amazon MSK に接続するには、次の手順を実行します：
+ Kafka の秘密キーと公開証明書を準備します。
+ 証明書を DMS Certificate Manager にアップロードします。
+ Kafka エンドポイント設定で指定された、対応する証明書 ARN を使用して Kafka ターゲットエンドポイントを作成します。

**Amazon MSK の秘密キーと公開証明書を準備するには**

1. EC2 インスタンスを作成し、*Amazon Managed Streaming for Apache Kafka 開発者ガイド*の[クライアント認証](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)セクションにあるステップ 1 ～ 9 の説明に従って、認証を使用するようにクライアントをセットアップします。

   これらの手順を完了したら、Certificate-ARN（ACM に保存された公開証明書 ARN）と、 `kafka.client.keystore.jks` ファイル内のプライベートキーができます。

1. 公開証明書を取得し、次のコマンドを使用し証明書を `signed-certificate-from-acm.pem` ファイルにコピーします：

   ```
   aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
   ```

   コマンドは以下のような情報を返します：

   ```
   {"Certificate": "123", "CertificateChain": "456"}
   ```

   次に、`"123"` に同等のものを `signed-certificate-from-acm.pem` ファイルにコピーします。

1. 次の例に示すように,`msk-rsa` キーを `kafka.client.keystore.jks to keystore.p12` からインポートしてプライベートキーを取得します:

   ```
   keytool -importkeystore \
   -srckeystore kafka.client.keystore.jks \
   -destkeystore keystore.p12 \
   -deststoretype PKCS12 \
   -srcalias msk-rsa-client \
   -deststorepass test1234 \
   -destkeypass test1234
   ```

1. 次のコマンドを使用して`keystore.p12` を `.pem` の形式にエクスポートします。

   ```
   Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
   ```

   **[Enter PEM pass phrase]**(PEM パスフレーズを入力してください) メッセージが表示され、証明書の暗号化に適用されるキーを識別します。

1. バッグ属性とキー属性を `.pem` ファイルから削除し、最初の行が次の文字列でスタートしていることを確認します。

   ```
                                   ---BEGIN ENCRYPTED PRIVATE KEY---
   ```

**公開証明書とプライベートキーを DMS Certificate Managerにアップロードし、Amazon MSK への接続をテストするには**

1. 次のコマンドを使用して、DMS Certificate Managerにアップロードします。

   ```
   aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://{{path to signed cert}}
   aws dms import-certificate --certificate-identifier private-key —certificate-pem file://{{path to private key}}
   ```

1. Amazon MSK ターゲット エンドポイントを作成し、接続をテストして TLS 認証が機能することを確認します。

   ```
   aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings 
   '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", 
   "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:",
   "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}'
   aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
   ```

**重要**  
SSL 認証を使用して、自己管理 Kafka クラスターへの接続をセキュリティで保護できます。場合によっては、オンプレミス Kafka クラスターでPrivate Certificate Authority (CA) を使用することがあります。その場合は、CA チェーンおよび公開証明書、プライベートキーを DMS Certificate Managerにアップロードします。次に、オンプレミス Kafka ターゲットエンドポイントを作成するときに、エンドポイント設定で対応する Amazon リソースネーム (ARN) を使用します。

**自己管理 Kafka クラスターのプライベートキーと署名付き証明書を準備するには**

1. 以下に示すようにキーペアを生成します。

   ```
   keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass {{your-keystore-password}} 
   -keypass {{your-key-passphrase}} -dname "CN={{your-cn-name}}" 
   -alias {{alias-of-key-pair}} -storetype pkcs12 -keyalg RSA
   ```

1. 証明書署名リクエスト (CSR) を取得します。

   ```
   keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass {{your-key-store-password}} 
   -keypass {{your-key-password}}
   ```

1. クラスター トラストストアの CA を使用して CSR に署名します。CA がない場合は、独自のプライベート CA を作成できます。

   ```
   openssl req -new -x509 -keyout ca-key -out ca-cert -days {{validate-days}}                            
   ```

1. `ca-cert` をサーバーのトラストストアとキーストアにインポートします。トラストストアをお持ちでない場合は、次のコマンドを使用してトラストストアを作成してこれに `ca-cert ` をインポートします。

   ```
   keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
   keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
   ```

1. 証明書に署名します。

   ```
   openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem 
   -days {{validate-days}} -CAcreateserial -passin pass:{{ca-password}}
   ```

1. 署名付き証明書をキーストアにインポートします。

   ```
   keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass {{your-keystore-password}} 
   -keypass {{your-key-password}}
   ```

1. 次のコマンドを使用して、`on-premise-rsa` キーを `kafka.server.keystore.jks` から `keystore.p12` にインポートします。

   ```
   keytool -importkeystore \
   -srckeystore kafka.server.keystore.jks \
   -destkeystore keystore.p12 \
   -deststoretype PKCS12 \
   -srcalias on-premise-rsa \
   -deststorepass {{your-truststore-password }}\
   -destkeypass {{your-key-password}}
   ```

1. 次のコマンドを使用して `keystore.p12` を `.pem` の形式にエクスポートします。

   ```
   Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
   ```

1. `encrypted-private-server-key.pem` および `signed-certificate.pem`、`ca-cert`を DMS Certificate Managerにアップロードします。

1. 返された ARN を使用してエンドポイントを作成します。

   ```
   aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings 
   '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", 
   "SslClientCertificateArn": "{{your-client-cert-arn}}","SslClientKeyArn": "{{your-client-key-arn}}","SslClientKeyPassword":"{{your-client-key-password}}", 
   "SslCaCertificateArn": "{{your-ca-certificate-arn}}"}'
                               
   aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
   ```

### SASL-SSL 認証を使用して Amazon MSK に接続する
<a name="CHAP_Target.Kafka.TLS.SSL-SASL"></a>

簡易認証およびセキュリティレイヤー (SASL) 方式では、ユーザー名とパスワードを使用してクライアント ID を検証し、サーバーとクライアント間で暗号化された接続を作成します。

SASL を使用するには、Amazon MSK クラスターを設定するときに、まず安全なユーザー名とパスワードを作成します。Amazon MSK クラスターの安全なユーザー名とパスワードを設定する方法については、*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*の「[Amazon MSK クラスターの SALS/SCRAM 認証の設定](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-tutorial)」をご参照ください。

次に、Kafka ターゲット エンドポイントを作成するときに、`sasl-ssl` オプションを使ってセキュリティプロトコル エンドポイントの設定（`SecurityProtocol`) を行います。`SaslUsername` と `SaslPassword` オプションも設定します。次の JSON の例に示すように、Amazon MSK クラスターを初めてセットアップしたときに作成した安全なユーザー名とパスワードと一致していることを確認してください。

```
                   
"KafkaSettings": {
    "SecurityProtocol": "sasl-ssl",
    "SaslUsername":"{{Amazon MSK cluster secure user name}}",
    "SaslPassword":"{{Amazon MSK cluster secure password}}"                    
}
```

**注記**  
現在、 はパブリック CA ベースの SASL-SSL のみ AWS DMS をサポートしています。DMS は、プライベート CA を基盤とするセルフマネージド型 Kafka で使用する SASL-SSL はサポートしていません。
SASL-SSL 認証の場合、 はデフォルトで SCRAM-SHA-512 メカニズム AWS DMS をサポートします。 AWS DMS バージョン 3.5.0 以降は、プレーンメカニズムもサポートしています。Plain メカニズムを使用するには、`KafkaSettings` API データ型の `SaslMechanism` パラメータを `PLAIN` に設定します。データ型 `PLAIN` は Kafka でサポートされていますが、Amazon Kafka (MSK) ではサポートされていません。

## ターゲットとして Apache Kafka の CDC 行の元の値を表示するために前イメージを使用
<a name="CHAP_Target.Kafka.BeforeImage"></a>

Kafka のようなデータストリーミングターゲットに CDC 更新を書き込むときは、更新によって変更される前に、ソースデータベースの行の元の値を表示できます。これを可能にするために、 はソースデータベースエンジンによって提供されたデータに基づいて、更新イベントの*前のイメージ* AWS DMS を入力します。

ソースデータベースエンジンによって、前イメージに対してさまざまな量の情報が提供されます。
+ Oracle では、列が変更された場合にのみ列の更新が提供されます。
+ PostgreSQL は、プライマリキーの一部である列のデータ (変更されたかどうか) のみを提供します。論理レプリケーションを使用し、ソーステーブルに REPLICA IDENTITY FULL を設定した場合は、WAL に書き込まれた行の変更前と変更後の情報をすべて取得できます。このような情報はここで確認できます。
+ MySQL は通常、すべての列のデータ (変更されたかどうか) を提供します。

前イメージを有効にして、ソースデータベースから元の値を AWS DMS 出力に追加するには、`BeforeImageSettings` タスク設定または `add-before-image-columns` パラメータを使用します。このパラメータは、列変換ルールを適用します。

`BeforeImageSettings` は、次に示すように、ソースデータベースシステムから収集された値を使用して、すべての更新オペレーションに新しい JSON 属性を追加します。

```
"BeforeImageSettings": {
    "EnableBeforeImage": boolean,
    "FieldName": string,  
    "ColumnFilter": pk-only (default) / non-lob / all (but only one)
}
```

**注記**  
全ロード \+ CDCタスク (既存のデータを移行して進行中の変更をレプリケートする)、または CDC のみのタスク (データ変更のみをレプリケートする) に `BeforeImageSettings` を適用します。全ロードのタスクには `BeforeImageSettings` を適用しないでください。

`BeforeImageSettings` オプションには、次の項目が適用されます。
+ `EnableBeforeImage` オプションを `true` に設定して、前イメージを有効にします。デフォルトは `false` です。
+ `FieldName` オプションを使用して、新しい JSON 属性に名前を割り当てます。`EnableBeforeImage` が `true` の場合、`FieldName` は必須であり、空にすることはできません。
+ `ColumnFilter` オプションは、前イメージを使用して追加する列を指定します。テーブルのプライマリキーの一部である列だけを追加するには、デフォルト値 `pk-only` を使用します。LOB タイプではない列のみを追加するには、`non-lob` を使用します。前イメージ値を持つ列を追加するには、`all` を使用します。

  ```
  "BeforeImageSettings": {
      "EnableBeforeImage": true,
      "FieldName": "before-image",
      "ColumnFilter": "pk-only"
    }
  ```

### 前イメージ変換ルールの使用
<a name="CHAP_Target.Kafka.BeforeImage.Transform-Rule"></a>

タスク設定の代わりに、列変換ルールを適用する `add-before-image-columns` パラメータを使用できます。このパラメータを使用すると、Kafka のようなデータストリーミングターゲットで CDC 中に前イメージを有効にできます。

変換ルールで `add-before-image-columns` を使用すると、前イメージの結果のよりきめ細かい制御を適用することができます。変換ルールを使用すると、オブジェクトロケーターを使用し、ルールに選択したテーブルを制御できます。また、変換ルールを連結することもできます。これにより、テーブルごとに異なるルールを適用できます。その後、他のルールを使用して生成された列を操作できます。

**注記**  
同じタスク内で、`add-before-image-columns` パラメータと同時に `BeforeImageSettings` タスク設定を使用しないでください。代わりに、1 つのタスクにこのパラメータとこの設定のいずれかを使用し、両方を使用しないでください。

列の `add-before-image-columns` パラメータを持つ `transformation` ルールタイプは、`before-image-def` セクションを提供する必要があります。例を以下に示します。

```
    {
      "rule-type": "transformation",
      …
      "rule-target": "column",
      "rule-action": "add-before-image-columns",
      "before-image-def":{
        "column-filter": one-of  (pk-only / non-lob / all),
        "column-prefix": string,
        "column-suffix": string,
      }
    }
```

`column-prefix` の値は列名の前に付加され、`column-prefix` のデフォルト値は `BI_` です。`column-suffix` の値は列名に追加され、デフォルトは空です。`column-prefix` と `column-suffix` の両方を空の文字列に設定しないでください。

`column-filter` の値を 1 つ選択します。テーブルのプライマリキーの一部である列だけを追加するには、`pk-only` を選択します。LOB タイプではない列のみを追加するように `non-lob` を選択します。または、前イメージの値を持つ任意の列を追加するように `all` を選択します。

### 前イメージ変換前ルールの例
<a name="CHAP_Target.Kafka.BeforeImage.Example"></a>

次の例の変換ルールは、ターゲットに `BI_emp_no` という新しい列を追加します。したがって、`UPDATE employees SET emp_no = 3 WHERE emp_no = 1;` のようなステートメントは、`BI_emp_no` フィールドに 1 を設定します。CDC 更新を Amazon S3 ターゲットに書き込むと、更新された元の行は `BI_emp_no` 列からわかります。

```
{
  "rules": [
    {
      "rule-type": "selection",
      "rule-id": "1",
      "rule-name": "1",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "include"
    },
    {
      "rule-type": "transformation",
      "rule-id": "2",
      "rule-name": "2",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "employees"
      },
      "rule-action": "add-before-image-columns",
      "before-image-def": {
        "column-prefix": "BI_",
        "column-suffix": "",
        "column-filter": "pk-only"
      }
    }
  ]
}
```

`add-before-image-columns` ルールアクションの使用方法については、「[変換ルールおよび変換アクション](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md)」をご参照ください。

## のターゲットとして Apache Kafka を使用する場合の制限 AWS Database Migration Service
<a name="CHAP_Target.Kafka.Limitations"></a>

ターゲットとして Apache Kafka を使用する場合、以下の制限が適用されます。
+ AWS DMS Kafka ターゲットエンドポイントは、Amazon Managed Streaming for Apache Kafka (Amazon MSK) の IAM アクセスコントロールをサポートしていません。
+ 完全 LOB モードはサポートされていません。
+ が新しいトピックを自動的に作成できるようにするプロパティを使用して AWS DMS 、クラスターの Kafka 設定ファイルを指定します。設定 `auto.create.topics.enable = true` を含めます。Amazon MSK を使用している場合は、Kafka クラスターを作成するときにデフォルト設定を指定し、`auto.create.topics.enable` 設定を `true` に変更できます。デフォルト設定の詳細については、*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*の「[Amazon MSK のデフォルト設定](https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html)」をご参照ください。Amazon MSK を使用して作成された既存の Kafka クラスターを変更する必要がある場合は、次の例のように AWS CLI コマンドを実行して Kafka 設定`aws kafka create-configuration`を更新します。

  ```
  14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration
  {
      "LatestRevision": {
          "Revision": 1,
          "CreationTime": "2019-09-06T14:39:37.708Z"
      },
      "CreationTime": "2019-09-06T14:39:37.708Z",
      "Name": "kafka-configuration",
      "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3"
  }
  ```

  ここでは、`//~/kafka_configuration` は必要なプロパティ設定を使用して作成した設定ファイルです。

  Amazon EC2 にインストールされている独自の Kafka インスタンスを使用している場合は、 `auto.create.topics.enable = true`インスタンスに用意されているオプションを使用して、 AWS DMS が新しいトピックを自動的に作成できるように Kafka クラスター設定を変更します。
+ AWS DMS は、トランザクションに関係なく、特定の Kafka トピック内の 1 つのデータレコード (メッセージ) としてソースデータベース内の 1 つのレコードに各更新を発行します。
+ AWS DMS では、パーティションキーに次の 4 つのフォームがサポートされています。
  + `SchemaName.TableName`: スキーマとテーブル名の組み合わせ。
  + `${AttributeName}`: JSON のいずれかのフィールドの値、またはソースデータベースのテーブルのプライマリキー。
  + `transaction-id`: CDC トランザクション ID。同じトランザクション内のすべてのレコードは、同じパーティションに移動します。
  + `constant`: テーブルやデータに関係なく、すべてのレコードの固定リテラル値。すべてのレコードは同じパーティションキー値「定数」に送信され、すべてのテーブルで厳密なグローバル順序が提供されます。

  ```
  {
      "rule-type": "object-mapping",
      "rule-id": "2",
      "rule-name": "TransactionIdPartitionKey",
      "rule-action": "map-record-to-document",
      "object-locator": {
          "schema-name": "onprem",
          "table-name": "it_system"
      },
      "mapping-parameters": {
          "partition-key-type": "transaction-id | constant | attribute-name | schema-table"
      }
  }
  ```
+ `IncludeTransactionDetails` エンドポイント設定は、ソースエンドポイントが Oracle、SQL Server、PostgreSQL、または MySQL の場合にのみサポートされます。他のソースエンドポイントタイプでは、トランザクションの詳細は含まれません。
+ `BatchApply` は Kafka エンドポイントではサポートされていません。Batch 適用 (例えば、`BatchApplyEnabled` のターゲットメタデータ タスク設定) を使用すると、Kafka ターゲットデータが失われる可能性があります。
+ AWS DMS は、16 桁を超える`BigInt`データ型の値の移行をサポートしていません。この制限を回避するには、次の変換ルールを使用して `BigInt` 列を文字列に変換できます。変換ルールの詳細については、「[変換ルールおよび変換アクション](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md)」を参照してください。

  ```
  {
      "rule-type": "transformation",
      "rule-id": "id",
      "rule-name": "name",
      "rule-target": "column",
      "object-locator": {
          "schema-name": "valid object-mapping rule action",
          "table-name": "",
          "column-name": ""
      },
      "rule-action": "change-data-type",
      "data-type": {
          "type": "string",
          "length": 20
      }
  }
  ```
+ AWS DMS Kafka ターゲットエンドポイントは Amazon MSK サーブレスをサポートしていません。
+ マッピングルールを定義する場合、オブジェクトマッピングルールと変換ルールの両方を保有するのはサポートされていません。設定の必要があるルールは 1 つのみです。
+ AWS DMS は、3.8 までの Apache Kafka バージョン用の SASL 認証をサポートしています。Kafka 4.0 以降を使用している場合は、SASL 認証なしでのみ接続できます。
+ AWS DMS は、Kafka をターゲットエンドポイントとして使用する場合、埋め込み`'\0'`文字を含むソースデータをサポートしていません。埋め込み`'\0'`文字を含むデータは、最初の`'\0'`文字で切り捨てられます。

## データを Kafka トピックに移行するためのオブジェクトマッピングの使用
<a name="CHAP_Target.Kafka.ObjectMapping"></a>

AWS DMS はテーブルマッピングルールを使用して、ソースからターゲット Kafka トピックにデータをマッピングします。ターゲットトピックにデータをマッピングするために、オブジェクトマッピングと呼ばれるテーブルマッピングルールのタイプを使用します。オブジェクトマッピングを使用して、ソースのデータレコードがどのように Kafka トピックに発行されたデータレコードにマッピングされるかを定義します。

Kafka トピックには、パーティションキー以外にプリセット構造はありません。

**注記**  
オブジェクトマッピングは必ずしも使用する必要はありません。通常のテーブルマッピングは、さまざまな変換に使用できます。ただし、パーティションキータイプは次のデフォルト動作に従います。  
プライマリキーはフルロードのパーティションキーとして使用されます。
並行適用タスク設定を使用しない場合は、`schema.table` が CDC のパーティションキーとして使用されます。
並列適用タスク設定を使用する場合、プライマリキーは CDC のパーティションキーとして使用されます。

オブジェクトマッピングルールを作成するには、`object-mapping` として `rule-type` を指定します。このルールが、使用したいオブジェクトマッピングのタイプを指定します。

ルールの構造は次のとおりです。

```
{
    "rules": [
        {
            "rule-type": "object-mapping",
            "rule-id": "{{id}}",
            "rule-name": "{{name}}",
            "rule-action": "{{valid object-mapping rule action}}",
            "object-locator": {
                "schema-name": "{{case-sensitive schema name}}",
                "table-name": ""
            }
        }
    ]
}
```

AWS DMS は現在、 `rule-action`パラメータの唯一の有効な値`map-record-to-document`として `map-record-to-record`と をサポートしています。これらの設定は、`exclude-columns` 属性リストの一部として除外されない値に影響します。`map-record-to-record` および `map-record-to-document`値は、デフォルトで がこれらのレコード AWS DMS を処理する方法を指定します。これらの値は、どのような方法でも属性マッピングに影響しません。

リレーショナルデータベースから Kafka トピックに移行する際に `map-record-to-record` を使用します。このルールタイプでは、Kafka トピックのパーティションキーとしてリレーショナルデータベースから `taskResourceId.schemaName.tableName` 値を使用し、ソースデータベース内の各列の属性を作成します。

`map-record-to-record` を使用する場合は、次の点に注意します。
+ この設定は、`exclude-columns` リストで除外されている列にのみ影響します。
+ このような列ごとに、 はターゲットトピックで対応する属性 AWS DMS を作成します。
+ AWS DMS は、ソース列が属性マッピングで使用されているかどうかに関係なく、この対応する属性を作成します。

`map-record-to-record` を理解するための 1 つの方法は、実際の動作を確認することです。この例では、次の構造とデータを含むリレーショナルデータベースのテーブルの行から始めると想定してください。


| FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth | 
| --- | --- | --- | --- | --- | --- | --- | --- | 
| Randy | Marsh | 5 | 221B Baker Street | 1234567890 | 31 Spooner Street, Quahog  | 9876543210 | 02/29/1988 | 

この情報を `Test` という名前のスキーマから Kafka トピックに移行するには、データをターゲットストリームにマッピングするルールを作成します。以下のルールはマッピングを示しています。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "DefaultMapToKafka",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            }
        }
    ]
}
```

Kafka トピックとパーティション キー (この場合は `taskResourceId.schemaName.tableName`) を指定すると、以下の説明は Kafka ターゲットトピックのサンプルデータを使用した結果のレコード形式を示します。

```
  {
     "FirstName": "Randy",
     "LastName": "Marsh",
     "StoreId":  "5",
     "HomeAddress": "221B Baker Street",
     "HomePhone": "1234567890",
     "WorkAddress": "31 Spooner Street, Quahog",
     "WorkPhone": "9876543210",
     "DateOfBirth": "02/29/1988"
  }
```

**Topics**
+ [属性マッピングを使用したデータの再構築](#CHAP_Target.Kafka.AttributeMapping)
+ [オブジェクトマッピングを使用したマルチトピックレプリケーション](#CHAP_Target.Kafka.MultiTopic)
+ [Apache Kafka のメッセージ形式](#CHAP_Target.Kafka.Messageformat)

### 属性マッピングを使用したデータの再構築
<a name="CHAP_Target.Kafka.AttributeMapping"></a>

属性マップを使用してデータを Kafka トピックに移行している間にデータを再構築できます。例えば、ソース内の複数のフィールドを結合してターゲット内に 1 つのフィールドを構成することもできます。以下の属性マップはデータを再構築する方法を示しています。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKafka",
            "rule-action": "map-record-to-record",
            "target-table-name": "CustomerData",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            },
            "mapping-parameters": {
                "partition-key-type": "attribute-name",
                "partition-key-name": "CustomerName",
                "exclude-columns": [
                    "firstname",
                    "lastname",
                    "homeaddress",
                    "homephone",
                    "workaddress",
                    "workphone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${lastname}, ${firstname}"
                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "document",
                        "attribute-sub-type": "json",
                        "value": {
                            "Home": {
                                "Address": "${homeaddress}",
                                "Phone": "${homephone}"
                            },
                            "Work": {
                                "Address": "${workaddress}",
                                "Phone": "${workphone}"
                            }
                        }
                    }
                ]
            }
        }
    ]
}
```

の定数値を設定するには`partition-key`、 を指定します。これにより`"partition-key-type: "constant"`、パーティション値が に設定されます`constant`。たとえば、すべてのデータを 1 つのパーティションに強制的に格納するためにこれを行うことができます。以下のマッピングはこの方法を示しています。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "1",
            "rule-name": "TransformToKafka",
            "rule-action": "map-record-to-document",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer"
            },
            "mapping-parameters": {
                "partition-key-type": "constant",
                "exclude-columns": [
                    "FirstName",
                    "LastName",
                    "HomeAddress",
                    "HomePhone",
                    "WorkAddress",
                    "WorkPhone"
                ],
                "attribute-mappings": [
                    {
                        "attribute-name": "CustomerName",
                        "value": "${FirstName},${LastName}"
                    },
                    {
                        "attribute-name": "ContactDetails",
                        "value": {
                            "Home": {
                                "Address": "${HomeAddress}",
                                "Phone": "${HomePhone}"
                            },
                            "Work": {
                                "Address": "${WorkAddress}",
                                "Phone": "${WorkPhone}"
                            }
                        }
                    },
                    {
                        "attribute-name": "DateOfBirth",
                        "value": "${DateOfBirth}"
                    }
                ]
            }
        }
    ]
}
```

**注記**  
特定のテーブル用のコントロールレコードの `partition-key` 値は、`TaskId.SchemaName.TableName` です。特定のタスク用のコントロールレコードの `partition-key` 値は、そのレコードの `TaskId` です。オブジェクトマッピングの `partition-key` 値を指定することは、コントロールレコードの `partition-key` には影響しません。  
 テーブルマッピングルール`attribute-name`で `partition-key-type`が に設定されている場合、 を指定する必要があります。これは`partition-key-name`、ソーステーブルの列またはマッピングで定義されたカスタム列を参照する必要があります。さらに、ソース列がターゲット Kafka トピックにどのようにマッピングされるかを定義するには、 を指定`attribute-mappings`する必要があります。

### オブジェクトマッピングを使用したマルチトピックレプリケーション
<a name="CHAP_Target.Kafka.MultiTopic"></a>

デフォルトでは、 AWS DMS タスクはすべてのソースデータを次のいずれかの Kafka トピックに移行します。
+  AWS DMS ターゲットエンドポイントの**トピック**フィールドで指定します。
+ ターゲットエンドポイントの **[トピック]** フィールドが入力されておらず、Kafka `auto.create.topics.enable` 設定が `true` に設定されている場合、`kafka-default-topic` の指定に従う。

 AWS DMS エンジンバージョン 3.4.6 以降では、 `kafka-target-topic` 属性を使用して、移行された各ソーステーブルを個別のトピックにマッピングできます。例えば、次のオブジェクトマッピングルールは、ソーステーブルを `Customer` と`Address` をそれぞれ Kafka トピック `customer_topic` と `address_topic` に移行します。同時に、 `Test` はスキーマ内のテーブルを含む他のすべてのソース`Bills`テーブルを、ターゲットエンドポイントで指定されたトピック AWS DMS に移行します。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "MapToKafka1",
            "rule-action": "map-record-to-record",
            "kafka-target-topic": "customer_topic",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer" 
            },
            "partition-key-type": "constant"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "3",
            "rule-name": "MapToKafka2",
            "rule-action": "map-record-to-record",
            "kafka-target-topic": "address_topic",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Address"
            },
            "partition-key-type": "constant"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "4",
            "rule-name": "DefaultMapToKafka",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Bills"
            }
        }
    ]
}
```

Kafka マルチトピックレプリケーションを使用すると、単一のレプリケーションタスクでソーステーブルをグループ化して個別の Kafka トピックに移行できます。

### Apache Kafka のメッセージ形式
<a name="CHAP_Target.Kafka.Messageformat"></a>

JSON 出力は、単にキーと値のペアのリストです。

**RecordType**  
レコードタイプはデータまたはコントロールのいずれかです。*データレコード*は、ソースの実際の行を表します。*コントロールレコード*は、タスクの再起動など、ストリーム内の重要なイベント用です。

**運用**  
データレコードの場合、オペレーションは `load`、`insert`、`update`、または `delete` です。  
コントロールレコードの場合、オペレーションは `create-table`、`rename-table`、`drop-table`、`change-columns`、`add-column`、`drop-column`、`rename-column`、`column-type-change` です。

**SchemaName**  
レコードのソーススキーマ。コントロールレコードの場合、このフィールドは空です。

**TableName**  
レコードのソーステーブル。コントロールレコードの場合、このフィールドは空です。

**タイムスタンプ**  
JSON メッセージが構築された時刻のタイムスタンプ。このフィールドは ISO 8601 形式でフォーマットされます。

次の JSON メッセージの例は、追加メタデータをすべて含むデータ型メッセージを示しています。

```
{ 
   "data":{ 
      "id":100000161,
      "fname":"val61s",
      "lname":"val61s",
      "REGION":"val61s"
   },
   "metadata":{ 
      "timestamp":"2019-10-31T22:53:59.721201Z",
      "record-type":"data",
      "operation":"insert",
      "partition-key-type":"primary-key",
      "partition-key-value":"sbtest.sbtest_x.100000161",
      "schema-name":"sbtest",
      "table-name":"sbtest_x",
      "transaction-id":9324410911751,
      "transaction-record-id":1,
      "prev-transaction-id":9324410910341,
      "prev-transaction-record-id":10,
      "commit-timestamp":"2019-10-31T22:53:55.000000Z",
      "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209"
   }
}
```

次の JSON メッセージの例は、コントロールタイプのメッセージを示しています。

```
{ 
   "control":{ 
      "table-def":{ 
         "columns":{ 
            "id":{ 
               "type":"WSTRING",
               "length":512,
               "nullable":false
            },
            "fname":{ 
               "type":"WSTRING",
               "length":255,
               "nullable":true
            },
            "lname":{ 
               "type":"WSTRING",
               "length":255,
               "nullable":true
            },
            "REGION":{ 
               "type":"WSTRING",
               "length":1000,
               "nullable":true
            }
         },
         "primary-key":[ 
            "id"
         ],
         "collation-name":"latin1_swedish_ci"
      }
   },
   "metadata":{ 
      "timestamp":"2019-11-21T19:14:22.223792Z",
      "record-type":"control",
      "operation":"create-table",
      "partition-key-type":"task-id",
      "schema-name":"sbtest",
      "table-name":"sbtest_t1"
   }
}
```