

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan Apache Kafka sebagai target AWS Database Migration Service
<a name="CHAP_Target.Kafka"></a>

Anda dapat menggunakan AWS DMS untuk memigrasikan data ke cluster Apache Kafka. Apache Kafka adalah platform streaming terdistribusi. Anda dapat menggunakan Apache Kafka untuk menelan dan memproses data streaming secara langsung.

AWS juga menawarkan Amazon Managed Streaming for Apache Kafka (Amazon MSK) untuk digunakan sebagai target. AWS DMS Amazon MSK adalah layanan streaming Apache Kafka yang terkelola penuh yang menyederhanakan implementasi dan pengelolaan instans Apache Kafka. Ini bekerja dengan versi Apache Kafka open-source, dan Anda mengakses instans MSK Amazon sebagai AWS DMS target persis seperti instans Apache Kafka lainnya. Untuk informasi lebih lanjut, lihat [Apa itu Amazon MSK?](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) dalam *Panduan Developer: Amazon Managed Streaming for Apache Kafka*.

Sebuah klaster Kafka menyimpan aliran catatan dalam kategori yang disebut topik yang dibagi menjadi partisi. *Partisi* adalah urutan catatan data (pesan) yang diidentifikasi secara unik secara unik dalam suatu topik. Partisi dapat didistribusikan ke beberapa broker dalam sebuah klaster untuk mengaktifkan pemrosesan paralel catatan topik. Untuk informasi lebih lanjut tentang topik dan partisi dan distribusi mereka di Apache Kafka, lihat [Topik dan log](https://kafka.apache.org/documentation/#intro_topics) dan [Distribusi](https://kafka.apache.org/documentation/#intro_distribution).

Kluster Kafka Anda dapat berupa instans MSK Amazon, klaster yang berjalan di instans Amazon EC2, atau kluster lokal. Instans MSK Amazon atau cluster pada instans Amazon EC2 dapat berada di VPC yang sama atau yang berbeda. Jika klaster Anda lokal, Anda dapat menggunakan server nama lokal Anda sendiri untuk instance replikasi untuk menyelesaikan nama host klaster. Untuk informasi tentang menyiapkan server nama untuk instance replikasi Anda, lihat[Menggunakan server nama on-premise Anda sendiri](CHAP_BestPractices.md#CHAP_BestPractices.Rte53DNSResolver). Untuk informasi selengkapnya tentang pengaturan jaringan, lihat[Menyiapkan jaringan untuk instans replikasi](CHAP_ReplicationInstance.VPC.md).

Saat menggunakan kluster MSK Amazon, pastikan grup keamanannya mengizinkan akses dari instance replikasi Anda. Untuk informasi tentang mengubah grup keamanan untuk klaster MSK Amazon, lihat [Mengubah grup keamanan klaster MSK Amazon](https://docs.aws.amazon.com/msk/latest/developerguide/change-security-group.html).

AWS Database Migration Service menerbitkan catatan ke topik Kafka menggunakan JSON. Selama konversi, AWS DMS membuat serial setiap catatan dari basis data sumber ke dalam pasangan atribut-nilai dalam format JSON.

Untuk memigrasikan data Anda dari sumber data apa pun yang didukung ke klaster Kafka target, Anda menggunakan pemetaan objek. Dengan pemetaan objek, Anda menentukan bagaimana struktur catatan data dalam topik target. Anda juga menentukan kunci partisi untuk setiap tabel, yang digunakan Apache Kafka untuk mengelompokkan data ke dalam partisi. 

Saat ini, AWS DMS mendukung satu topik per tugas. Untuk satu tugas dengan beberapa tabel, semua pesan masuk ke satu topik. Setiap pesan menyertakan bagian metadata yang mengidentifikasi skema dan tabel target. AWS DMS versi 3.4.6 dan yang lebih tinggi mendukung replikasi multitopik menggunakan pemetaan objek. Untuk informasi selengkapnya, lihat [Replikasi multitopik menggunakan pemetaan objek](#CHAP_Target.Kafka.MultiTopic).

**Pengaturan titik akhir Apache Kafka**

Anda dapat menentukan detail koneksi melalui pengaturan titik akhir di AWS DMS konsol, atau `--kafka-settings` opsi di CLI. Persyaratan untuk setiap pengaturan:
+ `Broker` - Tentukan lokasi dari satu atau lebih broker di klaster Kafka Anda dalam bentuk daftar yang dipisahkan koma untuk masing-masing `{{broker-hostname}}:{{port}}`. Contohnya `"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"`. Pengaturan ini dapat menentukan lokasi dari salah satu atau semua broker dalam klaster. Semua Klaster broker berkomunikasi untuk menangani partisi catatan data yang dimigrasikan ke topik.
+ `Topic` - (Opsional) Tentukan nama topik dengan panjang maksimum 255 huruf dan simbol. Anda dapat menggunakan titik (.), garis bawah (\_), dan minus (-). Nama topik dengan titik (.) atau garis bawah (\_) dapat bertabrakan dalam struktur data internal. Gunakan salah satu, tetapi jangan kedua simbol ini sekaligus dalam nama topik. Jika Anda tidak menentukan nama topik, AWS DMS gunakan `"kafka-default-topic"` sebagai topik migrasi.
**catatan**  
Untuk AWS DMS membuat topik migrasi yang Anda tentukan atau topik default, tetapkan `auto.create.topics.enable = true` sebagai bagian dari konfigurasi cluster Kafka Anda. Untuk informasi selengkapnya, lihat [Keterbatasan saat menggunakan Apache Kafka sebagai target AWS Database Migration Service](#CHAP_Target.Kafka.Limitations)
+ `MessageFormat` - Format keluaran untuk catatan yang dibuat pada titik akhir. Format pesan adalah `JSON` (default) atau `JSON_UNFORMATTED` (satu baris tanpa tab).
+ `MessageMaxBytes` - Ukuran maksimum dalam byte untuk catatan yang dibuat pada titik akhir. Default adalah 1.000.000.
**catatan**  
Anda hanya dapat menggunakan AWS CLI/SDK untuk mengubah `MessageMaxBytes` ke nilai non-default. Misalnya, untuk memodifikasi titik akhir Kafka yang sudah ada dan mengubah `MessageMaxBytes`, gunakan perintah berikut.  

  ```
  aws dms modify-endpoint --endpoint-arn {{your-endpoint}} 
  --kafka-settings Broker="{{broker1-server}}:{{broker1-port}},{{broker2-server}}:{{broker2-port}},...",
  Topic={{topic-name}},MessageMaxBytes={{integer-of-max-message-size-in-bytes}}
  ```
+ `IncludeTransactionDetails` - Menyediakan informasi transaksi detail dari basis data sumber. Informasi ini mencakup stempel waktu melakukan, posisi log, dan nilai-nilai untuk `transaction_id`, `previous_transaction_id`, dan `transaction_record_id` (catatan offset dalam transaksi). Nilai default-nya `false`.
+ `IncludePartitionValue` - Menampilkan nilai partisi dalam pesan keluaran Kafka, kecuali jika tipe partisi adalah `schema-table-type`. Nilai default-nya `false`.
+ `PartitionIncludeSchemaTable` - Skema prefiks dan nama tabel untuk nilai partisi, ketika tipe partisi `primary-key-type`. Melakukan hal ini meningkatkan distribusi data antara partisi Kafka. Sebagai contoh, anggaplah bahwa skema `SysBench` memiliki ribuan tabel dan setiap tabel hanya memiliki rentang terbatas untuk kunci primer. Dalam kasus ini, kunci primer yang sama dikirim dari ribuan tabel ke partisi yang sama, sehingga menyebabkan throttling. Nilai default-nya `false`.
+ `IncludeTableAlterOperations` - Termasuk setiap operasi bahasa definisi data (DDL) yang mengubah tabel dalam data kontrol, seperti `rename-table`, `drop-table`, `add-column`, `drop-column`, dan `rename-column`. Nilai default-nya `false`. 
+ `IncludeControlDetails` - Menunjukkan informasi kontrol detail untuk definisi tabel, definisi kolom, dan perubahan tabel dan kolom pada keluaran pesan Kafka. Nilai default-nya `false`.
+ `IncludeNullAndEmpty` - Sertakan NULL dan kolom kosong dalam target. Nilai default-nya `false`.
+ `SecurityProtocol` - Menyetel koneksi yang aman ke titik akhir target Kafka menggunakan Keamanan Lapisan Pengangkutan (TLS). Pilihan termasuk `ssl-authentication`, `ssl-encryption`, dan `sasl-ssl`. Menggunakan `sasl-ssl` membutuhkan `SaslUsername` dan `SaslPassword`.
+ `SslEndpointIdentificationAlgorithm`— Menetapkan verifikasi nama host untuk sertifikat. Pengaturan ini didukung di AWS DMS versi 3.5.1 dan yang lebih baru. Opsi mencakup hal berikut: 
  + `NONE`: Nonaktifkan verifikasi nama host broker dalam koneksi klien.
  + `HTTPS`: Aktifkan verifikasi nama host broker dalam koneksi klien.
+ `useLargeIntegerValue`— Gunakan hingga 18 digit int alih-alih casting int sebagai ganda, tersedia dari AWS DMS versi 3.5.4. Default-nya adalah salah.

Anda dapat menggunakan pengaturan untuk membantu meningkatkan kecepatan transfer Anda. Untuk melakukannya, AWS DMS mendukung beban penuh multithread klaster target Apache Kafka. AWS DMS mendukung multithreading dengan pengaturan tugas yang meliputi hal berikut ini:
+ `MaxFullLoadSubTasks`— Gunakan opsi ini untuk menunjukkan jumlah maksimum tabel sumber untuk dimuat secara paralel. AWS DMS memuat setiap tabel ke dalam tabel target Kafka yang sesuai menggunakan subtugas khusus. Default adalah 8; nilai maksimum adalah 49.
+ `ParallelLoadThreads`— Gunakan opsi ini untuk menentukan jumlah utas yang AWS DMS digunakan untuk memuat setiap tabel ke dalam tabel target Kafka. Nilai maksimum untuk target Apache Kafka adalah 32. Anda dapat meminta untuk meningkatkan batas maksimum ini.
+ `ParallelLoadBufferSize` - Gunakan pilihan ini untuk menentukan jumlah maksimum catatan untuk disimpan di buffer yang digunakan oleh thread beban paralel untuk memuat data ke target Kafka. Nilai default adalah 50. Nilai maksimumnya adalah 1.000. Gunakan pengaturan ini dengan `ParallelLoadThreads`. `ParallelLoadBufferSize` hanya berlaku bila ada lebih dari satu thread.
+ `ParallelLoadQueuesPerThread` - Gunakan pilihan ini untuk menentukan jumlah antrean yang diakses setiap thread secara bersamaan untuk mengambil catatan data dari antrean dan menghasilkan beban batch untuk target. Default adalah 1. Maksimum adalah 512.

Anda dapat meningkatkan kinerja pengambilan data perubahan (CDC) untuk titik akhir Kafka dengan menyetel pengaturan tugas untuk thread paralel dan operasi massal. Untuk melakukan ini, Anda dapat menentukan jumlah thread yang terjadi bersamaan, antrean per thread, dan jumlah catatan yang disimpan dalam buffer menggunakan pengaturan tugas `ParallelApply*`. Misalnya, Anda ingin melakukan beban CDC dan menerapkan 128 thread secara paralel. Anda juga ingin mengakses 64 antrean per thread, dengan 50 catatan disimpan per buffer. 

Untuk mempromosikan kinerja CDC, AWS DMS mendukung pengaturan tugas ini:
+ `ParallelApplyThreads`— Menentukan jumlah thread bersamaan yang AWS DMS digunakan selama beban CDC untuk mendorong catatan data ke titik akhir target Kafka. Nilai default adalah nol (0) dan nilai maksimum adalah 32.
+ `ParallelApplyBufferSize` - Menentukan jumlah maksimum catatan yang disimpan di setiap antrean buffer untuk thread serentak untuk mendorong ke titik akhir target Kinesis selama beban CDC. Nilai default adalah 100 dan nilai maksimum adalah 1.000. Gunakan pilihan ini saat `ParallelApplyThreads` menentukan lebih dari satu thread. 
+ `ParallelApplyQueuesPerThread` - Menentukan jumlah antrean yang diakses oleh setiap thread untuk mengambil catatan data dari antrean dan menghasilkan beban batch untuk titik akhir Kafka selama CDC. Default-nya adalah 1. Maksimum adalah 512.

Saat menggunakan pengaturan tugas `ParallelApply*`, default `partition-key-type`-nya adalah `primary-key` dari tabel, bukan `schema-name.table-name`.

## Menghubungkan ke Kafka menggunakan Keamanan Lapisan Pengangkutan (TLS)
<a name="CHAP_Target.Kafka.TLS"></a>

Sebuah klaster Kafka menerima koneksi aman menggunakan Keamanan Lapisan Pengangkutan (TLS). Dengan DMS, Anda dapat menggunakan salah satu dari tiga pilihan protokol keamanan berikut untuk mengamankan koneksi titik akhir Kafka.

**Enkripsi SSL (`server-encryption`)**  
Klien memvalidasi identitas server melalui sertifikat server. Kemudian koneksi terenkripsi dibuat antara server dan klien.

**Autentikasi SSL (`mutual-authentication`)**  
Server dan klien memvalidasi identitas dengan satu sama lain melalui sertifikat mereka sendiri. Kemudian koneksi terenkripsi dibuat antara server dan klien.

**SASL-SSL (`mutual-authentication`)**  
Metode The Simple Authentication and Security Layer (SASL) menggantikan sertifikat klien dengan nama pengguna dan kata sandi untuk memvalidasi identitas klien. Secara spesifik, Anda memberikan nama pengguna dan kata sandi yang telah didaftarkan ke server sehingga server dapat memvalidasi identitas klien. Kemudian koneksi terenkripsi dibuat antara server dan klien.

**penting**  
Apache Kafka dan Amazon MSK menerima sertifikat diselesaikan. Ini adalah keterbatasan yang sudah diketahui dari Kafka dan Amazon MSK yang harus ditangani. Untuk informasi lebih lanjut, lihat [Masalah Apache Kafka, KAFKA-3700](https://issues.apache.org/jira/browse/KAFKA-3700).  
Jika Anda menggunakan Amazon MSK, pertimbangkan untuk menggunakan daftar kontrol akses (ACLs) sebagai solusi untuk batasan yang diketahui ini. Untuk informasi selengkapnya tentang penggunaan ACLs, lihat ACLs bagian [Apache Kafka](https://docs.aws.amazon.com//msk/latest/developerguide/msk-acls.html) dari *Amazon Managed Streaming for Apache Kafka Developer* Guide.  
Jika Anda menggunakan klaster Kafka yang dikelola sendiri, lihat [Komentar tanggal 21/Oct/18](https://issues.apache.org/jira/browse/KAFKA-3700?focusedCommentId=16658376) untuk informasi tentang mengonfigurasi klaster Anda.

### Menggunakan enkripsi SSL dengan Amazon MSK atau klaster Kafka yang dikelola sendiri
<a name="CHAP_Target.Kafka.TLS.SSLencryption"></a>

Anda dapat menggunakan enkripsi SSL untuk mengamankan koneksi titik akhir ke Amazon MSK atau klaster Kafka yang dikelola sendiri. Saat Anda menggunakan metode autentikasi enkripsi SSL, klien memvalidasi identitas server melalui sertifikat server. Kemudian koneksi terenkripsi dibuat antara server dan klien.

**Menggunakan enkripsi SSL untuk terhubung ke Amazon MSK**
+ Tetapkan pengaturan titik akhir protokol keamanan (`SecurityProtocol`) menggunakan pilihan `ssl-encryption` ketika membuat titik akhir target Kafka Anda. 

  Contoh JSON berikut menetapkan protokol keamanan sebagai enkripsi SSL.

```
"KafkaSettings": {
    "SecurityProtocol": "ssl-encryption", 
}
```

**Untuk menggunakan enkripsi SSL sebagai klaster Kafka yang dikelola sendiri**

1. Jika Anda menggunakan Otoritas Sertifikasi (CA) pribadi di klaster Kafka on-premise Anda, unggahlah sertifikat CA pribadi dan dapatkan Amazon Resource Name (ARN). 

1. Tetapkan pengaturan titik akhir protokol keamanan (`SecurityProtocol`) menggunakan pilihan `ssl-encryption` ketika membuat titik akhir target Kafka Anda. Contoh JSON berikut menetapkan protokol keamanan sebagai `ssl-encryption`.

   ```
   "KafkaSettings": {
       "SecurityProtocol": "ssl-encryption", 
   }
   ```

1. Jika Anda menggunakan CA pribadi, atur `SslCaCertificateArn` di ARN yang Anda dapatkan di langkah pertama di atas.

### Menggunakan autentikasi SSL
<a name="CHAP_Target.Kafka.TLS.SSLauthentication"></a>

Anda dapat menggunakan autentikasi SSL untuk mengamankan koneksi titik akhir ke Amazon MSK atau klaster Kafka yang dikelola sendiri.

Untuk mengaktifkan autentikasi klien dan enkripsi menggunakan otentikasi SSL untuk menyambung ke Amazon MSK, lakukan hal berikut:
+ Siapkan kunci privat dan sertifikat publik untuk Kafka.
+ Unggah sertifikat ke DMS Certificate Manager .
+ Buat titik akhir target Kafka dengan sertifikat terkait yang ARNs ditentukan dalam pengaturan titik akhir Kafka.

**Untuk mempersiapkan kunci privat dan sertifikat publik untuk Amazon MSK**

1. Membuat instans EC2 dan atur klien untuk menggunakan autentikasi seperti yang dijelaskan dalam langkah 1 sampai 9 di [Autentikasi klien](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html) Bagian dari *Panduan Developer Amazon Managed Streaming for Apache Kafka*.

   Setelah Anda menyelesaikan langkah-langkah tersebut, Anda memiliki sertifikat-ARN (sertifikat publik ARN disimpan di ACM), dan kunci privat yang terdapat dalam file `kafka.client.keystore.jks`.

1. Dapatkan sertifikat publik dan salin sertifikat ke file `signed-certificate-from-acm.pem`, menggunakan perintah berikut:

   ```
   aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
   ```

   Perintah itu mengembalikan informasi yang serupa dengan contoh berikut:

   ```
   {"Certificate": "123", "CertificateChain": "456"}
   ```

   Anda kemudian menyalin yang setara dengan `"123"` ke file `signed-certificate-from-acm.pem`.

1. Dapatkan kunci privat dengan mengimpor kunci `msk-rsa` dari `kafka.client.keystore.jks to keystore.p12`, seperti yang ditunjukkan dalam contoh berikut.

   ```
   keytool -importkeystore \
   -srckeystore kafka.client.keystore.jks \
   -destkeystore keystore.p12 \
   -deststoretype PKCS12 \
   -srcalias msk-rsa-client \
   -deststorepass test1234 \
   -destkeypass test1234
   ```

1. Gunakan perintah berikut untuk mengekspor `keystore.p12` ke format `.pem`. 

   ```
   Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
   ```

   Pesan **Masukkan frase PEM pass** muncul dan mengidentifikasi kunci yang diterapkan untuk mengenkripsi sertifikat.

1. Hapus atribut tas dan atribut kunci dari file `.pem` untuk memastikan bahwa baris pertama dimulai dengan string berikut.

   ```
                                   ---BEGIN ENCRYPTED PRIVATE KEY---
   ```

**Untuk mengunggah sertifikat publik dan kunci privat ke DMS certificate manager dan menguji sambungan ke Amazon MSK**

1. Unggah ke DMS Certificate Manager menggunakan perintah berikut.

   ```
   aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://{{path to signed cert}}
   aws dms import-certificate --certificate-identifier private-key —certificate-pem file://{{path to private key}}
   ```

1. Buat titik akhit target Amazon MSK dan uji koneksi untuk memastikan bahwa autentikasi TLS berfungsi.

   ```
   aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings 
   '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", 
   "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:",
   "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}'
   aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
   ```

**penting**  
Anda dapat menggunakan autentikasi SSL untuk mengamankan koneksi ke klaster Kafka yang dikelola sendiri. Dalam beberapa kasus, Anda mungkin menggunakan Otoritas Sertifikasi (CA) pribadi di kluster Kafka on-premise Anda. Jika demikian, unggah rantai CA Anda, sertifikat publik, dan kunci privat ke DMS certificate manager. Kemudian, gunakan Amazon Resource Name (ARN) yang sesuai dalam pengaturan titik akhir Anda ketika Anda membuat titik akhir target Kafka on-premise.

**Untuk mempersiapkan kunci privat dan sertifikat yang sudah ditandatangani untuk klaster Kafka yang dikelola sendiri**

1. Buat pasangan kunci seperti yang ditunjukkan pada contoh berikut.

   ```
   keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass {{your-keystore-password}} 
   -keypass {{your-key-passphrase}} -dname "CN={{your-cn-name}}" 
   -alias {{alias-of-key-pair}} -storetype pkcs12 -keyalg RSA
   ```

1. Buat sertifikat tanda permintaan (CSR). 

   ```
   keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass {{your-key-store-password}} 
   -keypass {{your-key-password}}
   ```

1. Gunakan CA di trusstore klaster Anda untuk menandatangani CSR. Jika Anda tidak memiliki CA, Anda dapat membuat CA pribadi Anda sendiri.

   ```
   openssl req -new -x509 -keyout ca-key -out ca-cert -days {{validate-days}}                            
   ```

1. Impor `ca-cert` ke dalam server truststore dan keystore. Jika Anda tidak memiliki truststore, gunakan perintah berikut untuk membuat truststore dan impor `ca-cert ` ke dalamnya. 

   ```
   keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
   keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
   ```

1. Tanda tangani sertifikat.

   ```
   openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem 
   -days {{validate-days}} -CAcreateserial -passin pass:{{ca-password}}
   ```

1. Impor sertifikat yang sudah ditandatangani ke keystore.

   ```
   keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass {{your-keystore-password}} 
   -keypass {{your-key-password}}
   ```

1. Gunakan perintah berikut untuk mengimpor `on-premise-rsa` kunci dari `kafka.server.keystore.jks` ke `keystore.p12`.

   ```
   keytool -importkeystore \
   -srckeystore kafka.server.keystore.jks \
   -destkeystore keystore.p12 \
   -deststoretype PKCS12 \
   -srcalias on-premise-rsa \
   -deststorepass {{your-truststore-password }}\
   -destkeypass {{your-key-password}}
   ```

1. Gunakan perintah berikut untuk mengekspor `keystore.p12` ke format `.pem`.

   ```
   Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
   ```

1. Unggah `encrypted-private-server-key.pem`, `signed-certificate.pem`, dan `ca-cert` untuk DMS certificate manager.

1. Buat titik akhir dengan menggunakan yang dikembalikan ARNs.

   ```
   aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings 
   '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", 
   "SslClientCertificateArn": "{{your-client-cert-arn}}","SslClientKeyArn": "{{your-client-key-arn}}","SslClientKeyPassword":"{{your-client-key-password}}", 
   "SslCaCertificateArn": "{{your-ca-certificate-arn}}"}'
                               
   aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
   ```

### Menggunakan autentikasi SASL-SSL untuk terhubung ke Amazon MSK
<a name="CHAP_Target.Kafka.TLS.SSL-SASL"></a>

Metode The Simple Authentication and Security Layer (SASL) menggunakan nama pengguna dan kata sandi untuk memvalidasi identitas klien, dan membuat koneksi terenkripsi antara server dan klien.

Untuk menggunakan SASL, pertama-tama buat nama pengguna dan kata sandi yang aman ketika mengatur Amazon MSK klaster Anda. Untuk deskripsi cara mengatur nama pengguna dan kata sandi yang aman untuk klaster MSK Amazon, lihat [Menyiapkan SASL/SCRAM otentikasi untuk klaster MSK Amazon di Panduan Pengembang Amazon](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-tutorial) *Managed Streaming for* Apache Kafka.

Kemudian, ketika Anda membuat titik akhir target Kafka, setel pengaturan titik akhir protokol keamanan (`SecurityProtocol`) menggunakan pilihan `sasl-ssl`. Anda juga mengatur pilihan `SaslUsername` dan `SaslPassword`. Pastikan semua ini konsisten dengan nama pengguna dan kata sandi aman yang Anda buat ketika pertama kali menyiapkan klaster Amazon MSK Anda, seperti yang ditunjukkan dalam contoh JSON berikut.

```
                   
"KafkaSettings": {
    "SecurityProtocol": "sasl-ssl",
    "SaslUsername":"{{Amazon MSK cluster secure user name}}",
    "SaslPassword":"{{Amazon MSK cluster secure password}}"                    
}
```

**catatan**  
Saat ini, hanya AWS DMS mendukung SASL-SSL yang didukung CA publik. DMS tidak mendukung SASL-SSL untuk digunakan dengan Kafka yang dikelola sendiri yang didukung oleh CA pribadi.
Untuk otentikasi SASL-SSL, AWS DMS mendukung mekanisme SCRAM-SHA-512 secara default. AWS DMS versi 3.5.0 dan yang lebih tinggi juga mendukung mekanisme Plain. Untuk mendukung mekanisme Plain, atur `SaslMechanism` parameter tipe data `KafkaSettings` API ke`PLAIN`. Jenis data didukung oleh Kafka, tetapi `PLAIN` tidak didukung oleh Amazon Kafka (MSK).

## Menggunakan citra sebelumnya untuk melihat nilai-nilai asli dari baris CDC untuk Apache Kafka sebagai target
<a name="CHAP_Target.Kafka.BeforeImage"></a>

Ketika menulis pembaruan CDC ke target data-streaming seperti Kafka Anda dapat melihat nilai asli baris basis data sumber sebelum diubah oleh pembaruan.. Untuk memungkinkan hal ini, AWS DMS mengisi *gambar sebelum* peristiwa pembaruan berdasarkan data yang disediakan oleh mesin database sumber. 

Mesin basis data sumber yang berbeda memberikan jumlah informasi yang berbeda untuk citra sebelum: 
+ Oracle menyediakan update untuk kolom hanya jika mereka berubah. 
+ PostgreSQL menyediakan data hanya untuk kolom yang merupakan bagian dari kunci primer (berubah atau tidak). Jika replikasi logis sedang digunakan dan REPLICA IDENTITY FULL diatur untuk tabel sumber, Anda bisa mendapatkan seluruh informasi sebelum dan sesudah pada baris yang ditulis ke WALs dan tersedia di sini.
+ MySQL umumnya menyediakan data untuk semua kolom (berubah atau tidak).

Untuk mengaktifkan pencitraan sebelumnya dengan tujuan menambahkan nilai asli dari basis data sumber untuk keluaran AWS DMS , gunakan pengaturan tugas `BeforeImageSettings` atau parameter `add-before-image-columns`. Parameter ini menerapkan aturan transformasi kolom. 

`BeforeImageSettings` menambahkan atribut JSON baru untuk setiap operasi pembaruan dengan nilai yang dikumpulkan dari sistem basis data sumber, seperti yang ditunjukkan berikut.

```
"BeforeImageSettings": {
    "EnableBeforeImage": boolean,
    "FieldName": string,  
    "ColumnFilter": pk-only (default) / non-lob / all (but only one)
}
```

**catatan**  
Terapkan `BeforeImageSettings` pada beban penuh ditambah tugas CDC (yang memigrasikan data yang ada dan mereplikasi perubahan yang sedang berlangsung), atau ke tugas CDC saja (yang hanya mereplikasi perubahan data). Jangan terapkan `BeforeImageSettings` pada tugas-tugas yang beban penuh saja.

Untuk pilihan `BeforeImageSettings`, hal berikut berlaku:
+ Atur pilihan `EnableBeforeImage` ke `true` untuk mengaktifkan pencitraan sebelum. Nilai default-nya `false`. 
+ Gunakan pilihan `FieldName` untuk memberikan nama ke atribut JSON baru. Ketika `EnableBeforeImage` adalah `true`, `FieldName` diperlukan dan tidak dapat kosong.
+ Pilihan `ColumnFilter` menentukan kolom untuk menambahkan dengan menggunakan pencitraan sebelumnya. Untuk menambahkan kolom yang hanya merupakan bagian dari kunci primer tabel saja, gunakan nilai default, `pk-only`. Untuk menambahkan hanya kolom yang bukan tipe LOB, gunakan `non-lob`. Untuk menambahkan kolom apa pun yang memiliki nilai citra sebelum, gunakan `all`. 

  ```
  "BeforeImageSettings": {
      "EnableBeforeImage": true,
      "FieldName": "before-image",
      "ColumnFilter": "pk-only"
    }
  ```

### Menggunakan aturan transformasi citra sebelumnya
<a name="CHAP_Target.Kafka.BeforeImage.Transform-Rule"></a>

Sebagai alternatif untuk pengaturan tugas, Anda dapat menggunakan parameter `add-before-image-columns`, yang menerapkan aturan transformasi kolom. Dengan parameter ini, Anda dapat mengaktifkan pencitraan sebelumnya selama CDC, pada target data streaming seperti Kinesis.

Dengan menggunakan `add-before-image-columns` dalam aturan transformasi, Anda dapat menerapkan kontrol yang lebih halus dari hasil citra sebelumnya. Aturan transformasi memungkinkan Anda untuk menggunakan pencari objek yang memberikan Anda kontrol atas tabel yang dipilih untuk aturan tersebut. Dan juga, Anda dapat merangkai aturan transformasi, yang memungkinkan aturan yang berbeda untuk diterapkan ke tabel yang berbeda. Anda kemudian dapat memanipulasi kolom yang dihasilkan dengan menggunakan aturan lain. 

**catatan**  
Jangan gunakan parameter `add-before-image-columns` bersamaan dengan pengaturan tugas `BeforeImageSettings` dalam tugas yang sama. Sebaliknya, gunakan salah satu saja, parameter atau pengaturan, tetapi tidak keduanya, untuk satu tugas.

Sebuah tipe peraturan `transformation` dengan parameter `add-before-image-columns` untuk kolom harus menyediakan `before-image-def` bagian. Berikut ini adalah sebuah contoh.

```
    {
      "rule-type": "transformation",
      …
      "rule-target": "column",
      "rule-action": "add-before-image-columns",
      "before-image-def":{
        "column-filter": one-of  (pk-only / non-lob / all),
        "column-prefix": string,
        "column-suffix": string,
      }
    }
```

Nilai dari `column-prefix` ditambahkan sebelum nama kolom, dan nilai default `column-prefix` adalah `BI_`. Nilai dari `column-suffix` ditambahkan sebelum nama kolom, dan default kosong. Jangan atur `column-prefix` dan `column-suffix` ke string kosong.

Pilih satu nilai untuk `column-filter`. Untuk menambahkan kolom yang merupakan bagian dari kunci primer tabel saja, pilih `pk-only`. Untuk menambahkan kolom yang bukan tipe LOB saja, gunakan `non-lob`. Atau pilih `all` untuk menambahkan kolom yang memiliki nilai citra-sebelum.

### Contoh untuk aturan transformasi citra sebelumnya
<a name="CHAP_Target.Kafka.BeforeImage.Example"></a>

Aturan transformasi dalam contoh berikut menambahkan kolom baru yang disebut `BI_emp_no` pada target. Jadi pernyataan seperti `UPDATE employees SET emp_no = 3 WHERE emp_no = 1;` mengisi `BI_emp_no` field dengan 1. Ketika Anda menulis pembaruan CDC untuk target Amazon S3, kolom `BI_emp_no` memungkinkan untuk memberitahu baris asli yang diperbarui.

```
{
  "rules": [
    {
      "rule-type": "selection",
      "rule-id": "1",
      "rule-name": "1",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "include"
    },
    {
      "rule-type": "transformation",
      "rule-id": "2",
      "rule-name": "2",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "employees"
      },
      "rule-action": "add-before-image-columns",
      "before-image-def": {
        "column-prefix": "BI_",
        "column-suffix": "",
        "column-filter": "pk-only"
      }
    }
  ]
}
```

Untuk informasi tentang penggunaan peraturan tindakan `add-before-image-columns`, lihat [Aturan dan tindakan transformasi](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md).

## Keterbatasan saat menggunakan Apache Kafka sebagai target AWS Database Migration Service
<a name="CHAP_Target.Kafka.Limitations"></a>

Batasan berikut berlaku saat menggunakan Apache Kafka sebagai target:
+ AWS DMS Titik akhir target Kafka tidak mendukung kontrol akses IAM untuk Amazon Managed Streaming for Apache Kafka (Amazon MSK).
+ Mode LOB penuh tidak didukung.
+ Tentukan file konfigurasi Kafka untuk klaster Anda dengan properti yang memungkinkan AWS DMS untuk membuat topik baru secara otomatis. Sertakan pengaturan, `auto.create.topics.enable = true`. Jika menggunakan Amazon MSK, Anda dapat menentukan konfigurasi default ketika membuat klaster Kafka Anda, kemudian mengubah pengaturan `auto.create.topics.enable` ke `true`. Untuk informasi lebih lanjut tentang pengaturan konfigurasi default, lihat [Konfigurasi Amazon MSK default](https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html) di *Panduan Developer Amazon Managed Streaming for Apache Kafka*. Jika Anda perlu memodifikasi cluster Kafka yang ada yang dibuat menggunakan Amazon MSK, jalankan AWS CLI perintah `aws kafka create-configuration` untuk memperbarui konfigurasi Kafka Anda, seperti pada contoh berikut:

  ```
  14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration
  {
      "LatestRevision": {
          "Revision": 1,
          "CreationTime": "2019-09-06T14:39:37.708Z"
      },
      "CreationTime": "2019-09-06T14:39:37.708Z",
      "Name": "kafka-configuration",
      "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3"
  }
  ```

  Di sini, `//~/kafka_configuration` adalah file konfigurasi yang telah Anda buat dengan pengaturan properti yang diperlukan.

  Jika Anda menggunakan instans Kafka Anda sendiri yang diinstal di Amazon EC2, ubah konfigurasi cluster Kafka dengan `auto.create.topics.enable = true` pengaturan untuk AWS DMS memungkinkan Anda membuat topik baru secara otomatis, menggunakan opsi yang disediakan bersama instans Anda.
+ AWS DMS menerbitkan setiap pembaruan ke satu catatan dalam database sumber sebagai satu catatan data (pesan) dalam topik Kafka tertentu terlepas dari transaksi.
+ AWS DMS mendukung empat bentuk berikut untuk kunci partisi:
  + `SchemaName.TableName`: Kombinasi skema dan nama tabel.
  + `${AttributeName}`: Nilai salah satu field di JSON, atau kunci primer dari tabel dalam basis data sumber.
  + `transaction-id`: ID transaksi CDC. Semua catatan dalam transaksi yang sama masuk ke partisi yang sama.
  + `constant`: Nilai literal tetap untuk setiap catatan terlepas dari tabel atau data. Semua catatan dikirim ke nilai kunci partisi yang sama “konstan”, memberikan urutan global yang ketat di semua tabel.

  ```
  {
      "rule-type": "object-mapping",
      "rule-id": "2",
      "rule-name": "TransactionIdPartitionKey",
      "rule-action": "map-record-to-document",
      "object-locator": {
          "schema-name": "onprem",
          "table-name": "it_system"
      },
      "mapping-parameters": {
          "partition-key-type": "transaction-id | constant | attribute-name | schema-table"
      }
  }
  ```
+ Pengaturan `IncludeTransactionDetails` endpoint hanya didukung ketika titik akhir sumber adalah Oracle, SQL Server, PostgreSQL, atau MySQL. Untuk jenis titik akhir sumber lainnya, detail transaksi tidak akan disertakan.
+ `BatchApply` Tidak didukung untuk titik akhir Kafka. Menggunakan penerapan Batch (misalnya, pengaturan tugas metadata target `BatchApplyEnabled`) untuk target Kafka dapat mengakibatkan hilangnya data.
+ AWS DMS tidak mendukung migrasi nilai tipe `BigInt` data dengan lebih dari 16 digit. Untuk mengatasi batasan ini, Anda dapat menggunakan aturan transformasi berikut untuk mengonversi `BigInt` kolom menjadi string. Untuk informasi selengkapnya tentang aturan transformasi, lihat[Aturan dan tindakan transformasi](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md).

  ```
  {
      "rule-type": "transformation",
      "rule-id": "id",
      "rule-name": "name",
      "rule-target": "column",
      "object-locator": {
          "schema-name": "valid object-mapping rule action",
          "table-name": "",
          "column-name": ""
      },
      "rule-action": "change-data-type",
      "data-type": {
          "type": "string",
          "length": 20
      }
  }
  ```
+ AWS DMS Titik akhir target Kafka tidak mendukung Amazon MSK tanpa servless.
+ Saat mendefinisikan aturan pemetaan, memiliki keduanya, aturan pemetaan objek dan aturan transformasi tidak didukung. Anda harus menetapkan hanya satu aturan. 
+ AWS DMS mendukung Otentikasi SASL untuk versi Apache Kafka hingga 3.8. Jika Anda menggunakan Kafka 4.0 atau lebih tinggi, Anda hanya dapat terhubung tanpa otentikasi SASL.
+ AWS DMS tidak mendukung data sumber yang berisi `'\0'` karakter yang disematkan saat menggunakan Kafka sebagai titik akhir target. Data yang berisi `'\0'` karakter tertanam akan terpotong pada karakter pertama`'\0'`.

## Menggunakan pemetaan objek untuk bermigrasi data ke topik Kafka
<a name="CHAP_Target.Kafka.ObjectMapping"></a>

AWS DMS menggunakan aturan pemetaan tabel untuk memetakan data dari sumber ke topik Kafka target. Untuk memetakan data ke topik target, Anda menggunakan jenis aturan pemetaan tabel yang disebut pemetaan objek. Anda menggunakan pemetaan objek untuk menentukan cara pencatatan data di peta sumber ke catatan data yang dipublikasikan ke topik Kafka. 

topik Kafka tidak memiliki struktur preset selain memiliki kunci partisi.

**catatan**  
Anda tidak perlu menggunakan pemetaan objek. Anda dapat menggunakan pemetaan tabel biasa untuk berbagai transformasi. Namun, jenis kunci partisi akan mengikuti perilaku default ini:   
Primary Key digunakan sebagai kunci partisi untuk Full Load.
Jika tidak ada pengaturan tugas penerapan paralel yang digunakan, `schema.table` digunakan sebagai kunci partisi untuk CDC.
Jika pengaturan tugas penerapan paralel digunakan, kunci utama digunakan sebagai kunci partisi untuk CDC.

Untuk membuat aturan pemetaan objek, tetapkan `rule-type` sebagai `object-mapping`. Aturan ini menentukan jenis pemetaan objek yang ingin Anda gunakan. 

Struktur untuk aturan tersebut adalah sebagai berikut.

```
{
    "rules": [
        {
            "rule-type": "object-mapping",
            "rule-id": "{{id}}",
            "rule-name": "{{name}}",
            "rule-action": "{{valid object-mapping rule action}}",
            "object-locator": {
                "schema-name": "{{case-sensitive schema name}}",
                "table-name": ""
            }
        }
    ]
}
```

AWS DMS saat ini mendukung `map-record-to-record` dan `map-record-to-document` sebagai satu-satunya nilai yang valid untuk `rule-action` parameter. Pengaturan ini memengaruhi nilai yang tidak dikecualikan sebagai bagian dari daftar `exclude-columns` atribut. `map-record-to-document`Nilai `map-record-to-record` dan menentukan bagaimana AWS DMS menangani catatan ini secara default. Nilai-nilai ini tidak mempengaruhi pemetaan atribut dengan cara apapun. 

Gunakan `map-record-to-record` ketika bermigrasi dari basis data relasional ke sebuah topik Kafka. Jenis aturan ini menggunakan nilai `taskResourceId.schemaName.tableName` dari basis data relasional sebagai kunci partisi dalam topik Kafka dan membuat atribut untuk masing-masing kolom dalam basis data sumber. 

Saat menggunakan`map-record-to-record`, perhatikan hal berikut:
+ Pengaturan ini hanya memengaruhi kolom yang dikecualikan oleh `exclude-columns` daftar.
+ Untuk setiap kolom tersebut, AWS DMS buat atribut yang sesuai dalam topik target.
+ AWS DMS menciptakan atribut yang sesuai ini terlepas dari apakah kolom sumber digunakan dalam pemetaan atribut. 

Salah satu cara untuk memahami `map-record-to-record` adalah melihatnya beraksi. Untuk contoh ini, anggaplah bahwa Anda memulai dengan baris tabel basis data relasional dengan struktur dan data berikut.


| FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth | 
| --- | --- | --- | --- | --- | --- | --- | --- | 
| Randy | Marsh | 5 | 221B Baker Street | 1234567890 | 31 Spooner Street, Quahog  | 9876543210 | 02/29/1988 | 

Untuk memigrasi informasi ini dari skema bernama `Test` ke topik Kafka, buatlah aturan untuk memetakan data ke topik target. Aturan berikut menggambarkan pemetaan. 

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "DefaultMapToKafka",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            }
        }
    ]
}
```

Mengingat topik Kafka dan kunci partisi (dalam hal ini, `taskResourceId.schemaName.tableName`), berikut ini menggambarkan format catatan yang dihasilkan menggunakan data sampel kami di topik target Kafka: 

```
  {
     "FirstName": "Randy",
     "LastName": "Marsh",
     "StoreId":  "5",
     "HomeAddress": "221B Baker Street",
     "HomePhone": "1234567890",
     "WorkAddress": "31 Spooner Street, Quahog",
     "WorkPhone": "9876543210",
     "DateOfBirth": "02/29/1988"
  }
```

**Topics**
+ [Restrukturisasi data dengan pemetaan atribut](#CHAP_Target.Kafka.AttributeMapping)
+ [Replikasi multitopik menggunakan pemetaan objek](#CHAP_Target.Kafka.MultiTopic)
+ [Format pesan untuk Apache Kafka](#CHAP_Target.Kafka.Messageformat)

### Restrukturisasi data dengan pemetaan atribut
<a name="CHAP_Target.Kafka.AttributeMapping"></a>

Anda dapat merestrukturisasi data saat migrasi ke topik kafka berlangsung menggunakan peta atribut. Misalnya, Anda mungkin ingin menggabungkan beberapa field di sumber menjadi field tunggal dalam target. Peta atribut berikut menggambarkan bagaimana merestrukturisasi data.

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKafka",
            "rule-action": "map-record-to-record",
            "target-table-name": "CustomerData",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            },
            "mapping-parameters": {
                "partition-key-type": "attribute-name",
                "partition-key-name": "CustomerName",
                "exclude-columns": [
                    "firstname",
                    "lastname",
                    "homeaddress",
                    "homephone",
                    "workaddress",
                    "workphone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${lastname}, ${firstname}"
                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "document",
                        "attribute-sub-type": "json",
                        "value": {
                            "Home": {
                                "Address": "${homeaddress}",
                                "Phone": "${homephone}"
                            },
                            "Work": {
                                "Address": "${workaddress}",
                                "Phone": "${workphone}"
                            }
                        }
                    }
                ]
            }
        }
    ]
}
```

Untuk menetapkan nilai konstan untuk`partition-key`, tentukan`"partition-key-type: "constant"`, ini menetapkan nilai partisi ke`constant`. Sebagai contoh, Anda mungkin melakukan ini untuk memaksa semua data untuk disimpan dalam partisi tunggal. Pemetaan berikut menggambarkan pendekatan ini. 

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "1",
            "rule-name": "TransformToKafka",
            "rule-action": "map-record-to-document",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer"
            },
            "mapping-parameters": {
                "partition-key-type": "constant",
                "exclude-columns": [
                    "FirstName",
                    "LastName",
                    "HomeAddress",
                    "HomePhone",
                    "WorkAddress",
                    "WorkPhone"
                ],
                "attribute-mappings": [
                    {
                        "attribute-name": "CustomerName",
                        "value": "${FirstName},${LastName}"
                    },
                    {
                        "attribute-name": "ContactDetails",
                        "value": {
                            "Home": {
                                "Address": "${HomeAddress}",
                                "Phone": "${HomePhone}"
                            },
                            "Work": {
                                "Address": "${WorkAddress}",
                                "Phone": "${WorkPhone}"
                            }
                        }
                    },
                    {
                        "attribute-name": "DateOfBirth",
                        "value": "${DateOfBirth}"
                    }
                ]
            }
        }
    ]
}
```

**catatan**  
Nilai `partition-key` untuk catatan kontrol yang ditujukan khusus untuk tabel tertentu adalah `TaskId.SchemaName.TableName`. Nilai `partition-key` untuk catatan kontrol yang ditujukan untuk tugas tertentu adalah `TaskId` dari catatan tersebut. Menentukan nilai `partition-key` dalam pemetaan objek tidak berdampak pada `partition-key` untuk catatan kontrol.  
 Kapan `partition-key-type` diatur ke `attribute-name` dalam aturan pemetaan tabel, Anda harus menentukan`partition-key-name`, yang harus mereferensikan kolom dari tabel sumber atau kolom kustom yang ditentukan dalam pemetaan. Selain itu, `attribute-mappings` harus disediakan untuk menentukan bagaimana kolom sumber memetakan ke topik target Kafka.

### Replikasi multitopik menggunakan pemetaan objek
<a name="CHAP_Target.Kafka.MultiTopic"></a>

Secara default, AWS DMS tugas memigrasikan semua data sumber ke salah satu topik Kafka berikut:
+ Seperti yang ditentukan dalam bidang **Topik** dari titik akhir AWS DMS target.
+ Seperti yang ditentukan oleh `kafka-default-topic` jika bidang **Topik** dari titik akhir target tidak diisi dan `auto.create.topics.enable` pengaturan Kafka diatur ke. `true`

Dengan versi AWS DMS engine 3.4.6 dan yang lebih tinggi, Anda dapat menggunakan `kafka-target-topic` atribut untuk memetakan setiap tabel sumber yang dimigrasi ke topik terpisah. Misalnya, aturan pemetaan objek berikut memigrasikan tabel sumber `Customer` dan `Address` ke topik Kafka `customer_topic` dan`address_topic`, masing-masing. Pada saat yang sama, AWS DMS memigrasikan semua tabel sumber lainnya, termasuk `Bills` tabel dalam `Test` skema, ke topik yang ditentukan di titik akhir target.

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "MapToKafka1",
            "rule-action": "map-record-to-record",
            "kafka-target-topic": "customer_topic",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer" 
            },
            "partition-key-type": "constant"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "3",
            "rule-name": "MapToKafka2",
            "rule-action": "map-record-to-record",
            "kafka-target-topic": "address_topic",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Address"
            },
            "partition-key-type": "constant"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "4",
            "rule-name": "DefaultMapToKafka",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Bills"
            }
        }
    ]
}
```

Dengan menggunakan replikasi multitopik Kafka, Anda dapat mengelompokkan dan memigrasikan tabel sumber untuk memisahkan topik Kafka menggunakan satu tugas replikasi.

### Format pesan untuk Apache Kafka
<a name="CHAP_Target.Kafka.Messageformat"></a>

Keluaran JSON hanya berupa daftar pasangan nilai-kunci. 

**RecordType**  
Jenis catatan dapat berupa data atau kontrol. *Catatan data* mewakili baris yang sebenarnya dalam sumber. *Kontrol catatan* adalah untuk peristiwa-peristiwa penting di aliran, misalnya restart tugas.

**Operasi**  
Untuk catatan data, operasinya dapat `load`, `insert`, `update`, atau `delete`.  
Untuk catatan kontrol, operasi dapat berupa`create-table`,`rename-table`,`drop-table`,`change-columns`,`add-column`,`drop-column`,`rename-column`, atau`column-type-change`.

**SchemaName**  
Skema sumber untuk catatan. Field ini bisa kosong untuk catatan kontrol.

**TableName**  
Tabel sumber untuk catatan. Field ini bisa kosong untuk catatan kontrol.

**Stempel waktu**  
Stempel waktu untuk saat pesan JSON dibuat. Field ini diformat dengan format ISO 8601.

Contoh pesan JSON berikut menggambarkan pesan tipe data dengan semua metadata tambahan.

```
{ 
   "data":{ 
      "id":100000161,
      "fname":"val61s",
      "lname":"val61s",
      "REGION":"val61s"
   },
   "metadata":{ 
      "timestamp":"2019-10-31T22:53:59.721201Z",
      "record-type":"data",
      "operation":"insert",
      "partition-key-type":"primary-key",
      "partition-key-value":"sbtest.sbtest_x.100000161",
      "schema-name":"sbtest",
      "table-name":"sbtest_x",
      "transaction-id":9324410911751,
      "transaction-record-id":1,
      "prev-transaction-id":9324410910341,
      "prev-transaction-record-id":10,
      "commit-timestamp":"2019-10-31T22:53:55.000000Z",
      "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209"
   }
}
```

Contoh pesan JSON berikut menggambarkan pesan tipe kontrol.

```
{ 
   "control":{ 
      "table-def":{ 
         "columns":{ 
            "id":{ 
               "type":"WSTRING",
               "length":512,
               "nullable":false
            },
            "fname":{ 
               "type":"WSTRING",
               "length":255,
               "nullable":true
            },
            "lname":{ 
               "type":"WSTRING",
               "length":255,
               "nullable":true
            },
            "REGION":{ 
               "type":"WSTRING",
               "length":1000,
               "nullable":true
            }
         },
         "primary-key":[ 
            "id"
         ],
         "collation-name":"latin1_swedish_ci"
      }
   },
   "metadata":{ 
      "timestamp":"2019-11-21T19:14:22.223792Z",
      "record-type":"control",
      "operation":"create-table",
      "partition-key-type":"task-id",
      "schema-name":"sbtest",
      "table-name":"sbtest_t1"
   }
}
```