

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# のターゲットとしての Amazon Kinesis Data Streams の使用 AWS Database Migration Service
<a name="CHAP_Target.Kinesis"></a>

を使用して AWS DMS 、Amazon Kinesis データストリームにデータを移行できます。Amazon Kinesis Data Streams サービスは、Amazon Kinesis Data Streams サービスの一部です。データレコードの大量のストリームをリアルタイムで収集し、処理するには、Kinesis データストリームを使用します。

Kinesis データストリームはシャードで構成されています。*シャード*は、ストリーム内のデータレコードの一意に識別されたシーケンスです。Amazon Kinesis Data Streams におけるシャードの詳細については、*Amazon Kinesis Data Streams デベロッパーガイド*の「[シャード](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#shard)」をご参照ください。

AWS Database Migration Service は、JSON を使用してレコードを Kinesis データストリームに発行します。変換時に、 AWS DMS はソースデータベースから各レコードを JSON 形式または JSON\_UNFORMATTED メッセージ形式の属性と値のペアにシリアル化します。JSON\_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。これにより、Amazon Data Firehose は Amazon S3 の宛先に Kinesis データを配信し、Amazon Athena を含むさまざまなクエリ エンジンを使用してクエリを実行できます。

サポートされている任意のデータソースから、ターゲットストリームにデータを移行するために、オブジェクトのマッピングを使用します。オブジェクトマッピングを使用して、ストリームにデータレコードを構築する方法を決定します。データをそのシャードにグループ化するために Kinesis Data Streams で使用する、各テーブルのパーティション キーも定義します。

AWS DMS は、いくつかの Kinesis Data Streams パラメータ値も設定します。テーブル作成のコストは、データの量および移行するテーブルの数によって異なります。

**注記**  
 AWS DMS コンソールまたは API の **SSL モード**オプションは、一部のデータストリーミングや Kinesis や DynamoDB などの NoSQL サービスには適用されません。これらはデフォルトで安全であるため、SSL モードの設定が none (**SSL Mode=None**) と等しい AWS DMS ことを示しています。SSL を使用するために、エンドポイントに追加の設定は必要ありません。例えば、Kinesis をターゲット エンドポイントとして使用する場合、デフォルトでセキュリティで保護されます。Kinesis へのすべての API コールは SSL を使用するため、 AWS DMS エンドポイントに追加の SSL オプションは必要ありません。 AWS DMS が Kinesis Data Stream への接続時にデフォルトで使用する HTTPS プロトコルを使用して、SSL エンドポイント経由で安全にデータを保存して取得できます。

**Kinesis Data Streams エンドポイント設定**

Kinesis Data Streams ターゲットエンドポイントを使用する場合、 AWS DMS API の `KinesisSettings`オプションを使用してトランザクションとコントロールの詳細を取得できます。

接続は、次のいずれかの方法で設定できます。
+  AWS DMS コンソールで、エンドポイント設定を使用します。
+ CLI では、[CreateEndpoint](https://docs.aws.amazon.com/dms/latest/APIReference/API_CreateEndpoint.html) コマンド の`kinesis-settings` オプションを使用します。

CLI で、次の `kinesis-settings` オプションのリクエストパラメータを使用します。
**注記**  
 AWS DMS バージョン 3.4.1 以降では、`IncludeNullAndEmpty` エンドポイント設定に対応しています。ただし、Kinesis Data Streams ターゲットのその他のエンドポイント設定は でサポートされています AWS DMS。
+ `MessageFormat` - エンドポイントで作成されたレコードの出力形式。メッセージ形式は `JSON` (デフォルト) または `JSON_UNFORMATTED` (タブなし 1 行) です。
+ `IncludeControlDetails` - Kinesis メッセージ出力に、テーブル定義、列定義、テーブル、列の変更の詳細な制御情報を表示します。デフォルトは `false` です。
+ `IncludeNullAndEmpty` — ターゲットに NULL 列と空の列を含めます。デフォルトは `false` です。
+ `IncludePartitionValue` – パーティションタイプが `schema-table-type` でない限り、Kinesis メッセージ出力内のパーティション値を表示します。デフォルトは `false` です。
+ `IncludeTableAlterOperations` – `rename-table`、`drop-table`、`add-column`、`drop-column`、`rename-column` など、制御データのテーブルを変更するデータ定義言語 (DDL) オペレーションが含まれます。デフォルトは `false` です。
+ `IncludeTransactionDetails` - ソースデータベースからの詳細のトランザクション情報を提供します。この情報には、コミットタイムスタンプ、ログの位置、`transaction_id`、`previous_transaction_id`、および `transaction_record_id ` (トランザクション内のレコードオフセット) の値が含まれます。デフォルトは `false` です。
+ `PartitionIncludeSchemaTable` パーティションタイプが `primary-key-type` の場合、スキーマとテーブル名をパーティション値にプレフィックスします。これにより、Kinesis シャード間のデータ分散が増加します。例えば、`SysBench` スキーマに数千のテーブルがあり、各テーブルのプライマリ キーの範囲が制限されているとします。この場合、同じプライマリキーが数千のテーブルから同じシャードに送信され、スロットリングが発生します。デフォルトは `false` です。
+ `UseLargeIntegerValue` - AWS DMS バージョン 3.5.4 で利用可能な倍精度のインスタンスをキャストする代わりに、最大 18 桁の整数を使用します。デフォルトは False です。

次の例で、 AWS CLIを使用して発行された例 `create-endpoint`コマンドを使用中の `kinesis-settings` オプションを示します。

```
aws dms \
  create-endpoint \
    --region <aws-region> \
    --endpoint-identifier <user-endpoint-identifier> \
    --endpoint-type target \
    --engine-name kinesis \
    --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::<account-id>:role/<kinesis-role-name>,StreamArn=arn:aws:kinesis:<aws-region>:<account-id>:stream/<stream-name>,MessageFormat=json-unformatted,
IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true,
IncludeTableAlterOperations=true
```

**マルチスレッド全ロードタスク設定**

転送速度を上げるために、 は Kinesis Data Streams ターゲットインスタンスへのマルチスレッド全ロード AWS DMS をサポートします。DMS では、このマルチスレッドでのタスク設定を、以下でサポートします。
+ `MaxFullLoadSubTasks` - 並列ロードするソーステーブルの最大数を指定するにはこのオプションを使用します。DMS は、専用のサブタスクを使用して、対応する Kinesis ターゲットテーブルに各テーブルをロードします。デフォルトは 8、最大値は 49 です。
+ `ParallelLoadThreads` – このオプションを使用して、 AWS DMS が各テーブルを Kinesis ターゲットテーブルにロードするために使用するスレッドの数を指定します。Kinesis Data Streams ターゲットの最大値は 32 です。この上限を増やすよう依頼できます。
+ `ParallelLoadBufferSize` – Kinesis ターゲットにデータをロードするために並列ロードスレッドが使用する、バッファ内に保存するレコードの最大数を指定するには、このオプションを使用します。デフォルト値は 50 です。最大値は 1000 です。この設定は `ParallelLoadThreads` で使用します。`ParallelLoadBufferSize` は、複数のスレッドがある場合にのみ有効です。
+ `ParallelLoadQueuesPerThread` - このオプションを使用して、各同時スレッドがキューからデータレコードを取り出し、ターゲットのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは 1 です。ただし、さまざまなペイロードサイズの Kinesis ターゲットの場合、有効な範囲は 1 スレッドあたり 5 ～ 512 キューです。

**マルチスレッド CDC ロードタスクの設定**

タスク設定を使用して `PutRecords` API コールの動作を変更するなど、Kinesis などのリアルタイム データストリーミング ターゲット エンドポイントの変更データキャプチャ (CDC) のパフォーマンスを向上させることができます。これを行うには、`ParallelApply*` タスク設定を使用して、同時スレッドの数、スレッドあたりのキュー数、バッファに格納するレコード数を指定します。例えば、CDC ロードを実行し、128 本のスレッドを並列に適用するとします。また、スレッドあたり 64 個のキューにアクセスして、バッファあたり 50 個のレコードを保存する必要があります。

CDC パフォーマンスを向上させるために、 は次のタスク設定 AWS DMS をサポートしています。
+ `ParallelApplyThreads` – CDC ロード中に AWS DMS がデータレコードを Kinesis ターゲットエンドポイントにプッシュするために使用する同時スレッドの数を指定します。デフォルト値は 0 で、最大値は 32 です。
+ `ParallelApplyBufferSize` – CDC ロード中に同時スレッドが Kinesis ターゲット エンドポイントにプッシュする場合に、各バッファキューに保存するレコードの最大数を指定します。デフォルト値は 100 で、最大値は 1,000 です。このオプションは、`ParallelApplyThreads` で複数のスレッドを指定する場合に使用します。
+ `ParallelApplyQueuesPerThread` – 各スレッドがキューからデータレコードを取り出し、CDC 中に Kinesis エンドポイントのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルト値は 1、最大値は 512 です。

`ParallelApply*` タスク設定を使用する場合、`partition-key-type` のデフォルトは `schema-name.table-name` ではなくテーブルの `primary-key` です。

## 前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示
<a name="CHAP_Target.Kinesis.BeforeImage"></a>

Kinesis のようなデータストリーミングターゲットに CDC 更新を書き込む場合、更新によって変更される前に、ソースデータベースの行の元の値を表示できます。これを可能にするために、 はソースデータベースエンジンによって提供されたデータに基づいて、更新イベントの*前のイメージ* AWS DMS を入力します。

ソースデータベースエンジンによって、前イメージに対してさまざまな量の情報が提供されます。
+ Oracle では、列が変更された場合にのみ列の更新が提供されます。
+ PostgreSQL は、プライマリキーの一部である列のデータ (変更されたかどうか) のみを提供します。すべての列に (変更の有無を問わず) データを提供するには、`REPLICA_IDENTITY` を `DEFAULT`でなく `FULL` に設定する必要があります。各テーブルを `REPLICA_IDENTITY` に設定する際は、慎重に行うべきであることに注意します。`REPLICA_IDENTITY` を `FULL` に設定すると、すべての列値がログ先行書き込み (WAL) に継続的に書き込まれます。これにより、頻繁に更新されるテーブルではパフォーマンスやリソースの問題が発生する可能性があります。
+ MySQL は通常、BLOB および CLOB データ型を除くすべての列のデータを (変更の有無を問わず) 提供します。

前イメージを有効にして、ソースデータベースから元の値を AWS DMS 出力に追加するには、`BeforeImageSettings` タスク設定または `add-before-image-columns` パラメータを使用します。このパラメータは、列変換ルールを適用します。

`BeforeImageSettings` は、次に示すように、ソースデータベースシステムから収集された値を使用して、すべての更新オペレーションに新しい JSON 属性を追加します。

```
"BeforeImageSettings": {
    "EnableBeforeImage": boolean,
    "FieldName": string,  
    "ColumnFilter": pk-only (default) / non-lob / all (but only one)
}
```

**注記**  
全ロードと CDC `BeforeImageSettings` AWS DMS タスク (既存のデータを移行して進行中の変更をレプリケートする) などの CDC コンポーネントを含むタスク、または CDC のみのタスク (データ変更のみをレプリケートする) にのみ適用されます。全ロードのタスクには `BeforeImageSettings` を適用しないでください。

`BeforeImageSettings` オプションには、次の項目が適用されます。
+ `EnableBeforeImage` オプションを `true` に設定して、前イメージを有効にします。デフォルトは `false` です。
+ `FieldName` オプションを使用して、新しい JSON 属性に名前を割り当てます。`EnableBeforeImage` が `true` の場合、`FieldName` は必須であり、空にすることはできません。
+ `ColumnFilter` オプションは、前イメージを使用して追加する列を指定します。テーブルのプライマリキーの一部である列だけを追加するには、デフォルト値 `pk-only` を使用します。前イメージ値を持つ列を追加するには、`all` を使用します。変更前イメージには、CLOB や BLOB などの LOB データ型の列が含まれていないことに注意します。

  ```
  "BeforeImageSettings": {
      "EnableBeforeImage": true,
      "FieldName": "before-image",
      "ColumnFilter": "pk-only"
    }
  ```

**注記**  
Amazon S3 ターゲットは `BeforeImageSettings` をサポートしていません。S3 ターゲットの場合、CDC 中に前イメージを実行するには `add-before-image-columns` 変換ルールのみを使用します。

### 前イメージ変換ルールの使用
<a name="CHAP_Target.Kinesis.BeforeImage.Transform-Rule"></a>

タスク設定の代わりに、列変換ルールを適用する `add-before-image-columns` パラメータを使用できます。このパラメータを使用すると、Kinesis のようなデータストリーミングターゲットで CDC 中に前イメージを有効にできます。

変換ルールで `add-before-image-columns` を使用すると、前イメージの結果のよりきめ細かい制御を適用することができます。変換ルールを使用すると、オブジェクトロケーターを使用し、ルールに選択したテーブルを制御できます。また、変換ルールを連結することもできます。これにより、テーブルごとに異なるルールを適用できます。その後、他のルールを使用して生成された列を操作できます。

**注記**  
同じタスク内で、`add-before-image-columns` パラメータと同時に `BeforeImageSettings` タスク設定を使用しないでください。代わりに、1 つのタスクにこのパラメータとこの設定のいずれかを使用し、両方を使用しないでください。

列の `add-before-image-columns` パラメータを持つ `transformation` ルールタイプは、`before-image-def` セクションを提供する必要があります。例を以下に示します。

```
    {
      "rule-type": "transformation",
      …
      "rule-target": "column",
      "rule-action": "add-before-image-columns",
      "before-image-def":{
        "column-filter": one-of  (pk-only / non-lob / all),
        "column-prefix": string,
        "column-suffix": string,
      }
    }
```

`column-prefix` の値は列名の前に付加され、`column-prefix` のデフォルト値は `BI_` です。`column-suffix` の値は列名に追加され、デフォルトは空です。`column-prefix` と `column-suffix` の両方を空の文字列に設定しないでください。

`column-filter` の値を 1 つ選択します。テーブルのプライマリキーの一部である列だけを追加するには、`pk-only` を選択します。LOB タイプではない列のみを追加するように `non-lob` を選択します。または、前イメージの値を持つ任意の列を追加するように `all` を選択します。

### 前イメージ変換前ルールの例
<a name="CHAP_Target.Kinesis.BeforeImage.Example"></a>

次の例の変換ルールは、ターゲットに `BI_emp_no` という新しい列を追加します。したがって、`UPDATE employees SET emp_no = 3 WHERE emp_no = 1;` のようなステートメントは、`BI_emp_no` フィールドに 1 を設定します。CDC 更新を Amazon S3 ターゲットに書き込むと、更新された元の行は `BI_emp_no` 列からわかります。

```
{
  "rules": [
    {
      "rule-type": "selection",
      "rule-id": "1",
      "rule-name": "1",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "include"
    },
    {
      "rule-type": "transformation",
      "rule-id": "2",
      "rule-name": "2",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "employees"
      },
      "rule-action": "add-before-image-columns",
      "before-image-def": {
        "column-prefix": "BI_",
        "column-suffix": "",
        "column-filter": "pk-only"
      }
    }
  ]
}
```

`add-before-image-columns` ルールアクションの使用方法については、「[変換ルールおよび変換アクション](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md)」をご参照ください。

## のターゲットとして Kinesis データストリームを使用するための前提条件 AWS Database Migration Service
<a name="CHAP_Target.Kinesis.Prerequisites"></a>

### のターゲットとして Kinesis データストリームを使用するための IAM ロール AWS Database Migration Service
<a name="CHAP_Target.Kinesis.Prerequisites.IAM"></a>

Kinesis データストリームを のターゲットとして設定する前に AWS DMS、必ず IAM ロールを作成してください。このロールでは AWS DMS 、移行先の Kinesis データストリームへのアクセスを が引き受けて許可する必要があります。アクセス許可の最小設定は、次の IAM ポリシーで示します。

------
#### [ JSON ]

****  

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement": [
   {
     "Sid": "1",
     "Effect": "Allow",
     "Principal": {
        "Service": "dms.amazonaws.com"
     },
   "Action": "sts:AssumeRole"
   }
]
}
```

------

Kinesis データストリームに移行する際に使用するロールには、次のアクセス許可が必要です。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "*"
    }
  ]
}
```

------

### のターゲットとしての Kinesis データストリームへのアクセス AWS Database Migration Service
<a name="CHAP_Target.Kinesis.Prerequisites.Access"></a>

 AWS DMS バージョン 3.4.7 以降では、Kinesis エンドポイントに接続するには、次のいずれかを実行する必要があります。
+ VPC エンドポイントを使用するように DMS を設定します。VPC エンドポイントを使用するように DMS を設定する方法については、「[の VPC エンドポイントの設定 AWS DMS](CHAP_VPC_Endpoints.md)」を参照してください。
+ パブリックルートを使用するように DMS を設定します。つまり、レプリケーションインスタンスをパブリックにします。パブリックレプリケーションインスタンスの詳細については、「[パブリックおよびプライベートレプリケーション インスタンス](CHAP_ReplicationInstance.PublicPrivate.md)」を参照してください。

## のターゲットとして Kinesis Data Streams を使用する場合の制限 AWS Database Migration Service
<a name="CHAP_Target.Kinesis.Limitations"></a>

ターゲットとして Kinesis Data Streams を使用する場合、以下の制限が適用されます。
+ AWS DMS は、トランザクションに関係なく、各更新を特定の Kinesis データストリーム内の 1 つのデータレコードとしてソースデータベース内の 1 つのレコードに発行します。ただし、`KinesisSettings` API の関連パラメータを使用して、各データレコードのトランザクションの詳細を含めることができます。
+ 完全 LOB モードはサポートされていません。
+ サポートされる最大 LOB サイズは 1 MB です。
+ Kinesis Data Streamsは、重複排除をサポートしていません。ストリームからデータを消費するアプリケーションは、重複したレコードを処理する必要があります。詳細については、*Amazon Kinesis Data Streams デベロッパーガイド*の「[重複レコードの処理](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html)」をご参照ください。
+ AWS DMS では、パーティションキーに次の 4 つのフォームがサポートされています。
  + `SchemaName.TableName`: スキーマとテーブル名の組み合わせ。
  + `${AttributeName}`: JSON のいずれかのフィールドの値、またはソースデータベースのテーブルのプライマリキー。
  + `transaction-id`: CDC トランザクション ID。同じトランザクション内のすべてのレコードは、同じパーティションに移動します。
  + `constant`: テーブルやデータに関係なく、すべてのレコードの固定リテラル値。すべてのレコードは同じパーティションキー値「定数」に送信され、すべてのテーブルで厳密なグローバル順序が提供されます。

  ```
  {
      "rule-type": "object-mapping",
      "rule-id": "2",
      "rule-name": "PartitionKeyTypeExample",
      "rule-action": "map-record-to-document",
      "object-locator": {
          "schema-name": "onprem",
          "table-name": "it_system"
      },
      "mapping-parameters": {
          "partition-key-type": "transaction-id | constant | attribute-name | schema-table"
      }
  }
  ```
+ Kinesis Data Streams 内の保管中のデータの暗号化の詳細については、*AWS Key Management Service デベロッパーガイド*の「[Kinesis Data Streams でのデータ保護](https://docs.aws.amazon.com/streams/latest/dev/server-side-encryption.html.html)」をご参照ください。
+ `IncludeTransactionDetails` エンドポイント設定は、ソースエンドポイントが Oracle、SQL Server、PostgreSQL、または MySQL の場合にのみサポートされます。他のソースエンドポイントタイプでは、トランザクションの詳細は含まれません。
+ `BatchApply`は Kinesis エンドポイントではサポートされていません。Kinesis ターゲットにバッチ 適用を使用 (例えば、`BatchApplyEnabled` ターゲットメタデータタスク設定) すると、タスク失敗やデータ損失が発生する可能性があります。Kinesisをターゲットエンドポイントとして使用する場合、`BatchApply` を有効にしないでください。
+ Kinesis ターゲットは、レプリケーションインスタンスと同じ AWS AWS リージョン アカウントの Kinesis データストリームでのみサポートされます。
+ MySQL ソースから移行する場合、BeforeImage データには CLOB データ型と BLOB データ型は含まれません。詳細については、「[前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示](#CHAP_Target.Kinesis.BeforeImage)」を参照してください。
+ AWS DMS は、16 桁を超える`BigInt`データ型の値の移行をサポートしていません。この制限を回避するには、次の変換ルールを使用して `BigInt` 列を文字列に変換できます。変換ルールの詳細については、「[変換ルールおよび変換アクション](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md)」を参照してください。

  ```
  {
      "rule-type": "transformation",
      "rule-id": "id",
      "rule-name": "name",
      "rule-target": "column",
      "object-locator": {
          "schema-name": "valid object-mapping rule action",
          "table-name": "",
          "column-name": ""
      },
      "rule-action": "change-data-type",
      "data-type": {
          "type": "string",
          "length": 20
      }
  }
  ```
+ 1 つのトランザクション内の複数の DML オペレーションが、ソースデータベースのラージオブジェクト (LOB) 列を変更すると、ターゲットデータベースは、対象トランザクションの最後のオペレーションにおける最終的な LOB 値のみを保持します。同じトランザクションの以前のオペレーションで設定された中間 LOB 値は上書きされるため、データの損失や不整合が発生する可能性があります。この動作は、レプリケーション中の LOB データの処理方法が原因で発生します。
+ AWS DMS Kinesis をターゲットエンドポイントとして使用する場合、 は埋め込み`'\0'`文字を含むソースデータをサポートしていません。埋め込み`'\0'`文字を含むデータは、最初の`'\0'`文字で切り捨てられます。

## Kinesis データストリームにデータを移行するためのオブジェクトマッピングの使用
<a name="CHAP_Target.Kinesis.ObjectMapping"></a>

AWS DMS はテーブルマッピングルールを使用して、ソースからターゲット Kinesis データストリームにデータをマッピングします。ターゲットストリームにデータをマッピングするために、オブジェクトマッピングと呼ばれるテーブルマッピングルールのタイプを使用します。オブジェクトマッピングを使用して、ソースのデータレコードがどのように Kinesis データストリームに発行されたデータレコードにマッピングされるかを定義します。

Kinesis データストリームには、パーティション　キー以外にプリセット構造はありません。オブジェクトマッピングルールでは、データレコードの `partition-key-type` に指定できる値は、`schema-table`、`transaction-id`、`primary-key`、`constant`、`attribute-name` です。

オブジェクトマッピングルールを作成するには、`object-mapping` として `rule-type` を指定します。このルールが、使用したいオブジェクトマッピングのタイプを指定します。

ルールの構造は次のとおりです。

```
{
    "rules": [
        {
            "rule-type": "object-mapping",
            "rule-id": "{{id}}",
            "rule-name": "{{name}}",
            "rule-action": "{{valid object-mapping rule action}}",
            "object-locator": {
                "schema-name": "{{case-sensitive schema name}}",
                "table-name": ""
            }
        }
    ]
}
```

AWS DMS は現在、 `rule-action`パラメータの唯一の有効な値`map-record-to-document`として `map-record-to-record`と をサポートしています。これらの設定は、`exclude-columns` 属性リストの一部として除外されない値に影響します。`map-record-to-record` および `map-record-to-document`値は、デフォルトで がこれらのレコード AWS DMS を処理する方法を指定します。これらの値は、どのような方法でも属性マッピングに影響しません。

リレーショナルデータベースから Kinesis データストリームに移行する際に `map-record-to-record` を使用します。このルールタイプでは、Kinesis データストリームのパーティション キーとしてリレーショナルデータベースから `taskResourceId.schemaName.tableName` 値を使用し、ソースデータベース内の各列の属性を作成します。

`map-record-to-record` を使用する場合は、次の点に注意します。
+ この設定は、`exclude-columns` リストで除外されている列にのみ影響します。
+ このような列ごとに、 はターゲットトピックで対応する属性 AWS DMS を作成します。
+ AWS DMS は、ソース列が属性マッピングで使用されているかどうかに関係なく、この対応する属性を作成します。

`map-record-to-document` を使用して、属性名「\_doc」を使用しソース列を適切なターゲットストリーム内の単一のフラットドキュメントに配置します。 AWS DMS 说「`_doc`」という名前のソースで 1 つのフラットマップにデータを配置します。この配置は、`exclude-columns` 属性リストに含まれていないソーステーブル内のすべての列に適用されます。

`map-record-to-record` を理解するための 1 つの方法は、実際の動作を確認することです。この例では、次の構造とデータを含むリレーショナルデータベースのテーブルの行から始めると想定してください。


| FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth | 
| --- | --- | --- | --- | --- | --- | --- | --- | 
| Randy | Marsh | 5 | 221B Baker Street | 1234567890 | 31 Spooner Street, Quahog  | 9876543210 | 02/29/1988 | 

この情報を `Test` という名前のスキーマから Kinesis データストリームに移行するには、データをターゲットストリームにマッピングするルールを作成します。以下のルールはマッピングを示しています。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "DefaultMapToKinesis",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            }
        }
    ]
}
```

次は、Kinesis データストリームの結果のレコード形式を示しています：
+ StreamName: XXX
+ PartitionKey: Test.Customers //schmaName.tableName
+ データ: //次の JSON メッセージ

  ```
    {
       "FirstName": "Randy",
       "LastName": "Marsh",
       "StoreId":  "5",
       "HomeAddress": "221B Baker Street",
       "HomePhone": "1234567890",
       "WorkAddress": "31 Spooner Street, Quahog",
       "WorkPhone": "9876543210",
       "DateOfBirth": "02/29/1988"
    }
  ```

ただし、同じルールを使用しますが、`rule-action` パラメータを `map-record-to-document` に変更して特定の列を除外します。以下のルールはマッピングを示しています。

```
{
	"rules": [
	   {
			"rule-type": "selection",
			"rule-id": "1",
			"rule-name": "1",
			"rule-action": "include",
			"object-locator": {
				"schema-name": "Test",
				"table-name": "%"
			}
		},
		{
			"rule-type": "object-mapping",
			"rule-id": "2",
			"rule-name": "DefaultMapToKinesis",
			"rule-action": "map-record-to-document",
			"object-locator": {
				"schema-name": "Test",
				"table-name": "Customers"
			},
			"mapping-parameters": {
				"exclude-columns": [
					"homeaddress",
					"homephone",
					"workaddress",
					"workphone"
				]
			}
		}
	]
}
```

この場合、`exclude-columns` パラメータ、`FirstName`、`LastName`、`StoreId`、`DateOfBirth` は `_doc` にマッピングされます。次は、この結果のレコード形式を示しています。

```
       {
            "data":{
                "_doc":{
                    "FirstName": "Randy",
                    "LastName": "Marsh",
                    "StoreId":  "5",
                    "DateOfBirth": "02/29/1988"
                }
            }
        }
```

### 属性マッピングを使用したデータの再構築
<a name="CHAP_Target.Kinesis.AttributeMapping"></a>

属性マップを使用してデータを Kinesis データストリームに移行している間にデータを再構築できます。例えば、ソース内の複数のフィールドを結合してターゲット内に 1 つのフィールドを構成することもできます。以下の属性マップはデータを再構築する方法を示しています。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKinesis",
            "rule-action": "map-record-to-record",
            "target-table-name": "CustomerData",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            },
            "mapping-parameters": {
                "partition-key-type": "attribute-name",
                "partition-key-name": "CustomerName",
                "exclude-columns": [
                    "firstname",
                    "lastname",
                    "homeaddress",
                    "homephone",
                    "workaddress",
                    "workphone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${lastname}, ${firstname}"
                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "document",
                        "attribute-sub-type": "json",
                        "value": {
                            "Home": {
                                "Address": "${homeaddress}",
                                "Phone": "${homephone}"
                            },
                            "Work": {
                                "Address": "${workaddress}",
                                "Phone": "${workphone}"
                            }
                        }
                    }
                ]
            }
        }
    ]
}
```

の定数値を設定するには`partition-key`、 を指定します。これにより`"partition-key-type: "constant"`、パーティション値が に設定されます`constant`。たとえば、すべてのデータを 1 つのシャードに強制的に格納するためにこれを行うことができます。以下のマッピングはこの方法を示しています。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKinesis",
            "rule-action": "map-record-to-document",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer"
            },
            "mapping-parameters": {
                "partition-key-type": "constant",
                "exclude-columns": [
                    "FirstName",
                    "LastName",
                    "HomeAddress",
                    "HomePhone",
                    "WorkAddress",
                    "WorkPhone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${FirstName},${LastName}"

                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": {
                            "Home": {
                                "Address": "${HomeAddress}",
                                "Phone": "${HomePhone}"
                            },
                            "Work": {
                                "Address": "${WorkAddress}",
                                "Phone": "${WorkPhone}"
                            }
                        }
                    },
                    {
                        "target-attribute-name": "DateOfBirth",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${DateOfBirth}"
                    }
                ]
            }
        }
    ]
}
```

**注記**  
特定のテーブル用のコントロールレコードの `partition-key` 値は、`TaskId.SchemaName.TableName` です。特定のタスク用のコントロールレコードの `partition-key` 値は、そのレコードの `TaskId` です。オブジェクトマッピングの `partition-key` 値を指定することは、コントロールレコードの `partition-key` には影響しません。  
 テーブルマッピングルール`attribute-name`で `partition-key-type`が に設定されている場合、 を指定する必要があります。これは`partition-key-name`、ソーステーブルの列またはマッピングで定義されたカスタム列を参照する必要があります。さらに、ソース列がターゲット Kinesis Stream にどのようにマッピングされるかを定義するには、 を指定`attribute-mappings`する必要があります。

### Kinesis Data Streams のメッセージ形式
<a name="CHAP_Target.Kinesis.Messageformat"></a>

JSON 出力は、単にキーと値のペアのリストです。JSON\_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。

AWS DMS には、Kinesis Data Streams からのデータを簡単に消費できるように、次の予約済みフィールドが用意されています。

**RecordType**  
レコードタイプはデータまたはコントロールのいずれかです。*データレコード*は、ソースの実際の行を表します。*コントロールレコード*は、タスクの再起動など、ストリーム内の重要なイベント用です。

**運用**  
データレコードの場合、オペレーションは `load`、`insert`、`update`、または `delete` です。  
コントロールレコードの場合、オペレーションは `create-table`、`rename-table`、`drop-table`、`change-columns`、`add-column`、`drop-column`、`rename-column`、`column-type-change` です。

**SchemaName**  
レコードのソーススキーマ。コントロールレコードの場合、このフィールドは空です。

**TableName**  
レコードのソーステーブル。コントロールレコードの場合、このフィールドは空です。

**タイムスタンプ**  
JSON メッセージが構築された時刻のタイムスタンプ。このフィールドは ISO 8601 形式でフォーマットされます。