

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# AWS GlueRegistri skema
<a name="schema-registry"></a>

**catatan**  
AWS GlueRegistri Skema tidak didukung di Wilayah berikut di AWS Glue konsol: Timur Tengah (UEA).

Registri AWS Glue Skema memungkinkan Anda menemukan, mengontrol, dan mengembangkan skema aliran data secara terpusat. Sebuah *skema* mendefinisikan struktur dan format catatan data. Dengan registri AWS Glue Skema, Anda dapat mengelola dan menerapkan skema pada aplikasi streaming data Anda menggunakan integrasi yang nyaman dengan Apache Kafka,, [Amazon Kinesis [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/)Data Streams, Amazon](https://aws.amazon.com/kinesis/data-streams/) Managed Service untuk Apache Flink[,](https://aws.amazon.com/kinesis/data-analytics/) dan. [AWS Lambda](https://aws.amazon.com/lambda/)

Registri Skema mendukung format data AVRO (v1.11.4), format Data JSON dengan format Skema JSON untuk [skema (spesifikasi Draft-04, Draft-06, dan Draft-07) dengan validasi skema JSON menggunakan [pustaka Everit](https://github.com/everit-org/json-schema), Protokol Buffer (Protobuf) versi proto2 dan proto3 tanpa dukungan untuk atau, dan dukungan bahasa Java, dengan format](https://json-schema.org/) data dan bahasa lain yang akan datang. `extensions` `groups` Fitur yang didukung termasuk kompatibilitas, sumber skema melalui metadata, registrasi otomatis skema, kompatibilitas IAM, dan kompresi ZLIB opsional untuk mengurangi penyimpanan dan transfer data. Registri Skema tanpa server dan gratis untuk digunakan.

Menggunakan sebuah skema sebagai kontrak format data antara produsen dan konsumen akan mengantarkan pada peningkatan tata kelola data, data berkualitas lebih tinggi, dan memungkinkan konsumen data untuk tahan terhadap perubahan hulu yang kompatibel.

Registri Skema memungkinkan sistem yang berbeda untuk berbagi skema untuk serialisasi dan de-serialisasi. Misalnya, anggaplah Anda memiliki produser dan konsumen data. Produsen mengetahui skemanya ketika menerbitkan data. Registri Skema memasok pen-serialisasi dan pen-deserialisasi untuk sistem tertentu seperti Amazon MSK atau Apache Kafka. 

 Untuk informasi selengkapnya, lihat [Cara kerja registri skema](schema-registry-works.md).

**Topics**
+ [Skema](#schema-registry-schemas)
+ [Pendaftar](#schema-registry-registries)
+ [Pembuatan versi dan kompatibilitas skema](#schema-registry-compatibility)
+ [Pustaka Serde sumber terbuka](#schema-registry-serde-libraries)
+ [Kuota Registri Skema](#schema-registry-quotas)
+ [Cara kerja registri skema](schema-registry-works.md)
+ [Memulai dengan registri skema](schema-registry-gs.md)

## Skema
<a name="schema-registry-schemas"></a>

Sebuah *skema* mendefinisikan struktur dan format catatan data. Sebuah skema adalah sebuah spesifikasi berversi untuk publikasi data yang handal, konsumsi, atau penyimpanan.

Dalam contoh ini skema untuk Avro, format dan strukturnya didefinisikan oleh tata letak dan nama bidang, dan format nama bidang didefinisikan oleh tipe data (misalnya, `string`, `int`).

```
{
    "type": "record",
    "namespace": "ABC_Organization",
    "name": "Employee",
    "fields": [
        {
            "name": "Name",
            "type": "string"
        },
        {
            "name": "Age",
            "type": "int"
        },
        {
            "name": "address",
            "type": {
                "type": "record",
                "name": "addressRecord",
                "fields": [
                    {
                        "name": "street",
                        "type": "string"
                    },
                    {
                        "name": "zipcode",
                        "type": "int" 
                    }
                ]
            }
        }
    ]
}
```

[Dalam contoh ini skema JSON draft-07 untuk JSON, format didefinisikan oleh organisasi Skema JSON.](https://json-schema.org/)

```
{
	"$id": "https://example.com/person.schema.json",
	"$schema": "http://json-schema.org/draft-07/schema#",
	"title": "Person",
	"type": "object",
	"properties": {
		"firstName": {
			"type": "string",
			"description": "The person's first name."
		},
		"lastName": {
			"type": "string",
			"description": "The person's last name."
		},
		"age": {
			"description": "Age in years which must be equal to or greater than zero.",
			"type": "integer",
			"minimum": 0
		}
	}
}
```

Dalam contoh ini untuk Protobuf, format didefinisikan oleh [versi 2 dari bahasa Protocol Buffers (](https://developers.google.com/protocol-buffers/docs/reference/proto2-spec)proto2).

```
syntax = "proto2";

package tutorial;

option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";

message Person {
  optional string name = 1;
  optional int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    optional string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}
```

## Pendaftar
<a name="schema-registry-registries"></a>

Sebuah *registri* adalah sebuah kontainer logis dari skema. Registri memungkinkan Anda untuk mengatur skema Anda, serta mengelola kontrol akses untuk aplikasi Anda. Sebuah registri memiliki Amazon Resource Name (ARN) untuk memungkinkan Anda untuk mengorganisasi dan mengatur izin akses yang berbeda untuk skema operasi dalam registri.

Anda dapat menggunakan registri default atau membuat pendaftar baru sebanyak yang diperlukan.


**Hirarki Registri Skema AWS Glue**  

|  | 
| --- |
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/schema-registry.html)  | 

## Pembuatan versi dan kompatibilitas skema
<a name="schema-registry-compatibility"></a>

Setiap skema dapat memiliki beberapa versi. Versioning diatur oleh sebuah aturan kompatibilitas yang diterapkan pada sebuah skema. Permintaan untuk mendaftar versi skema baru diperiksa berdasarkan aturan ini oleh Registri Skema sebelum mereka dapat berhasil. 

Sebuah versi skema yang ditandai sebagai pos pemeriksaan digunakan untuk menentukan kompatibilitas pendaftaran versi baru dari skema. Ketika sebuah skema pertama kali akan membuat pos pemeriksaan default akan menjadi versi pertama. Saat skema berkembang dengan lebih banyak versi, Anda dapat menggunakannya CLI/SDK untuk mengubah pos pemeriksaan ke versi skema menggunakan `UpdateSchema` API yang mematuhi serangkaian kendala. Di konsol, mengedit definisi skema atau mode kompatibilitas, secara default akan mengubah pos pemeriksaan ke versi terbaru. 

Mode kompatibilitas akan memungkinkan Anda untuk mengontrol bagaimana skema dapat atau tidak dapat berkembang dari waktu ke waktu. Mode ini membentuk kontrak antara aplikasi yang memproduksi dan mengkonsumsi data. Ketika versi baru dari sebuah skema dikirimkan ke registri, aturan kompatibilitas diterapkan ke nama skema yang digunakan untuk menentukan apakah versi baru dapat diterima. Ada 8 mode kompatibilitas: NONE, DISABLED, BACKWARD, BACKWARD\$1ALL, FORWARD, FORWARD\$1ALL, FULL, FULL\$1ALL.

Dalam format data Avro, bidang bisa opsional atau wajib. Sebuah bidang opsional adalah bidang di mana `Type` menyertakan nol. Bidang wajib tidak memiliki nol sebagai `Type`.

Dalam format data Protobuf, bidang dapat bersifat opsional (termasuk berulang) atau diperlukan dalam sintaks proto2, sementara semua bidang bersifat opsional (termasuk berulang) dalam sintaks proto3. Semua aturan kompatibilitas ditentukan berdasarkan pemahaman spesifikasi Protocol Buffer serta panduan dari dokumentasi [Google Protocol Buffer.](https://developers.google.com/protocol-buffers/docs/overview#updating)
+ *NONE*: Tidak ada mode kompatibilitas yang berlaku. Anda dapat menggunakan pilihan ini dalam skenario pengembangan atau jika Anda tidak tahu mode kompatibilitas yang ingin Anda terapkan untuk skema. Setiap versi baru yang ditambahkan akan diterima tanpa menjalani pemeriksaan kompatibilitas terlebih dahulu.
+ *DISABLED*: Pilihan kompatibilitas ini mencegah versioning untuk skema tertentu. Tidak ada versi baru yang dapat ditambahkan.
+ *BACKWARD*: Pilihan kompatibilitas ini dianjurkan karena memungkinkan konsumen untuk membaca versi skema saat ini dan sebelumnya. Anda dapat menggunakan pilihan ini untuk memeriksa kompatibilitas terhadap versi skema sebelumnya saat Anda menghapus bidang atau menambahkan bidang opsional. Kasus penggunaan khas untuk BACKWARD adalah saat aplikasi Anda telah dibuat untuk skema terbaru.

**AVRO**  
Sebagai contoh, anggaplah Anda memiliki sebuah skema yang ditentukan oleh nama depan (wajib), nama belakang (wajib), email (wajib), dan nomor telepon (opsional).

  Jika versi skema berikutnya menghapus bidang email wajib, ini akan berhasil mendaftar. Kompatibilitas BACKWARD mengharuskan konsumen untuk dapat membaca versi skema saat ini dan sebelumnya. Konsumen Anda akan dapat membaca skema baru karena bidang email tambahan dari pesan lama diabaikan.

  Jika Anda memiliki versi skema baru yang diusulkan yang menambahkan bidang wajib, misalnya, kode pos, maka hal ini tidak akan berhasil mendaftar dengan kompatibilitas BACKWARD. Konsumen Anda pada versi yang baru tidak akan dapat membaca pesan lama sebelum perubahan skema, karena mereka kehilangan kolom kode pos yang diperlukan. Namun demikian, jika bidang kode pos ditetapkan sebagai opsional dalam skema baru, maka versi yang diusulkan akan berhasil mendaftar karena konsumen dapat membaca skema lama tanpa bidang kode pos opsional.

**JSON**  
Sebagai contoh, anggaplah Anda memiliki versi skema yang ditentukan berdasarkan nama depan (opsional), nama belakang (opsional), email (opsional) dan nomor telepon (opsional).

  Jika versi skema berikutnya menambahkan properti nomor telepon opsional, maka hal ini akan berhasil mendaftar selama versi skema asli tidak mengizinkan properti tambahan dengan menetapkan bidang `additionalProperties` ke SALAH. Kompatibilitas BACKWARD mengharuskan konsumen untuk dapat membaca versi skema saat ini dan sebelumnya. Konsumen Anda akan dapat membaca data yang dihasilkan dengan skema asli di mana properti nomor telepon tidak ada.

  Jika Anda memiliki versi skema baru yang diusulkan yang menambahkan properti nomor telepon opsional, maka hal ini tidak akan berhasil mendaftar dengan kompatibilitas BACKWARD ketika versi skema asli menetapkan bidang `additionalProperties` ke BETUL, yaitu memungkinkan setiap properti tambahan. Konsumen Anda pada versi baru tidak akan dapat membaca pesan lama sebelum ada perubahan skema, karena mereka tidak dapat membaca data dengan properti nomor telepon dalam jenis yang berbeda, misalnya string bukan nomor.

**PROTOBUF**  
Misalnya, anggap Anda memiliki versi skema yang ditentukan oleh Message `Person` dengan bidang (required), `first name` (required), `last name` (required), dan `email` `phone number` (opsional) di bawah sintaks proto2.

  Mirip dengan skenario AVRO, jika versi skema Anda berikutnya menghapus `email` bidang yang diperlukan, ini akan berhasil mendaftar. Kompatibilitas BACKWARD mengharuskan konsumen untuk dapat membaca versi skema saat ini dan sebelumnya. Konsumen Anda akan dapat membaca skema baru karena `email` bidang tambahan dari pesan lama diabaikan.

  Jika Anda memiliki versi skema baru yang diusulkan yang menambahkan bidang wajib, misalnya`zip code`, ini tidak akan berhasil mendaftar dengan kompatibilitas BACKWARD. Konsumen Anda pada versi baru tidak akan dapat membaca pesan lama sebelum skema berubah, karena mereka kehilangan `zip code` bidang yang diperlukan. Namun, jika `zip code` bidang ditetapkan sebagai opsional dalam skema baru, maka versi yang diusulkan akan berhasil mendaftar karena konsumen dapat membaca skema lama tanpa bidang opsional`zip code`.

  Dalam kasus penggunaan gRPC, menambahkan layanan RPC baru atau metode RPC adalah perubahan yang kompatibel ke belakang. Misalnya, asumsikan Anda memiliki versi skema yang ditentukan oleh layanan RPC `MyService` dengan dua metode RPC dan. `Foo` `Bar`

  Jika versi skema Anda berikutnya menambahkan metode RPC baru yang dipanggil`Baz`, ini akan berhasil mendaftar. Konsumen Anda akan dapat membaca data yang dihasilkan dengan skema asli sesuai dengan kompatibilitas BACKWARD karena metode `Baz` RPC yang baru ditambahkan adalah opsional. 

  Jika Anda memiliki versi skema baru yang diusulkan yang menghapus metode RPC yang ada`Foo`, ini tidak akan berhasil mendaftar dengan kompatibilitas BACKWARD. Konsumen Anda pada versi baru tidak akan dapat membaca pesan lama sebelum skema berubah, karena mereka tidak dapat memahami dan membaca data dengan metode RPC yang tidak ada dalam `Foo` aplikasi gRPC.
+ *BACKWARD\$1ALL*: Pilihan kompatibilitas ini memungkinkan konsumen untuk membaca versi skema saat ini dan semua versi skema sebelumnya. Anda dapat menggunakan pilihan ini untuk memeriksa kompatibilitas terhadap semua versi skema sebelumnya saat Anda menghapus bidang atau menambahkan bidang opsional.
+ *FORWARD*: Pilihan kompatibilitas ini memungkinkan konsumen untuk membaca versi skema saat ini dan versi skema berikutnya, tetapi tidak selalu versi yang lebih baru. Anda dapat menggunakan pilihan ini untuk memeriksa kompatibilitas terhadap versi skema terakhir ketika Anda menambahkan bidang atau menghapus bidang opsional. Kasus penggunaan khas untuk FORWARD adalah ketika aplikasi Anda telah dibuat untuk skema sebelumnya dan harus mampu memproses sebuah skema yang lebih baru.

**AVRO**  
Sebagai contoh, anggaplah Anda memiliki sebuah versi skema yang ditentukan berdasarkan nama depan (wajib), nama belakang (wajib), email (opsional).

  Jika Anda memiliki versi skema baru yang menambahkan bidang wajib, misalnya nomor telepon, ini akan berhasil mendaftar. Kompatibilitas FORWARD mengharuskan konsumen untuk dapat membaca data yang dihasilkan dengan skema baru dengan menggunakan versi skema sebelumnya.

  Jika Anda memiliki versi skema yang diusulkan yang menghapus bidang nama pertama yang wajib, maka ini tidak akan berhasil mendaftar dengan kompatibilitas FORWARD. Konsumen Anda pada versi sebelumnya tidak akan dapat membaca skema yang diusulkan karena mereka tidak memiliki bidang nama pertama wajib. Namun, jika bidang nama pertama awalnya bersifat opsional, maka skema baru yang diusulkan akan berhasil mendaftar karena konsumen dapat membaca data berdasarkan skema baru yang tidak memiliki bidang nama depan opsional.

**JSON**  
Sebagai contoh, anggaplah Anda memiliki versi skema yang ditentukan berdasarkan nama depan (opsional), nama belakang (opsional), email (opsional) dan nomor telepon (opsional).

  Jika Anda memiliki sebuah versi skema baru yang menghapus properti nomor telepon opsional, maka ini akan berhasil mendaftar selama versi skema baru tidak mengizinkan properti tambahan dengan menetapkan bidang `additionalProperties` ke SALAH. Kompatibilitas FORWARD mengharuskan konsumen untuk dapat membaca data yang dihasilkan dengan skema baru dengan menggunakan versi skema sebelumnya.

  Jika Anda memiliki versi skema yang diusulkan yang menghapus properti nomor telepon opsional, maka ini tidak akan berhasil mendaftar dengan kompatibilitas FORWARD ketika versi skema baru menetapkan bidang `additionalProperties` ke BETUL, yakni memungkinkan setiap properti tambahan. Konsumen Anda pada versi sebelumnya tidak akan dapat membaca skema yang diusulkan karena mereka dapat memiliki properti nomor telepon dalam jenis yang berbeda, misalnya string bukan nomor.

**PROTOBUF**  
Misalnya, anggap Anda memiliki versi skema yang ditentukan oleh Pesan `Person` dengan bidang (wajib), `first name` (wajib), `last name` `email` (opsional) di bawah sintaks proto2.

  Mirip dengan skenario AVRO, jika Anda memiliki versi skema baru yang menambahkan bidang wajib, misalnya`phone number`, ini akan berhasil mendaftar. Kompatibilitas FORWARD mengharuskan konsumen untuk dapat membaca data yang dihasilkan dengan skema baru dengan menggunakan versi skema sebelumnya.

  Jika Anda memiliki versi skema yang diusulkan yang menghapus `first name` bidang wajib, ini tidak akan berhasil mendaftar dengan kompatibilitas FORWARD. Konsumen Anda pada versi sebelumnya tidak akan dapat membaca skema yang diusulkan karena mereka kehilangan `first name` bidang yang diperlukan. Namun, jika `first name` bidang tersebut awalnya opsional, maka skema baru yang diusulkan akan berhasil mendaftar karena konsumen dapat membaca data berdasarkan skema baru yang tidak memiliki bidang opsional`first name`.

  Dalam kasus penggunaan gRPC, menghapus layanan RPC atau metode RPC adalah perubahan yang kompatibel ke depan. Misalnya, asumsikan Anda memiliki versi skema yang ditentukan oleh layanan RPC `MyService` dengan dua metode RPC dan. `Foo` `Bar` 

  Jika versi skema Anda berikutnya menghapus metode RPC yang ada bernama`Foo`, ini akan berhasil mendaftar sesuai dengan kompatibilitas FORWARD karena konsumen dapat membaca data yang dihasilkan dengan skema baru dengan menggunakan versi sebelumnya. Jika Anda memiliki versi skema baru yang diusulkan yang menambahkan metode RPC`Baz`, ini tidak akan berhasil mendaftar dengan kompatibilitas FORWARD. Konsumen Anda pada versi sebelumnya tidak akan dapat membaca skema yang diusulkan karena mereka kehilangan metode RPC. `Baz`
+ *FORWARD\$1ALL*: Pilihan kompatibilitas ini memungkinkan konsumen untuk membaca data yang ditulis oleh produsen dari setiap skema terdaftar baru. Anda dapat menggunakan pilihan ini ketika Anda harus menambahkan bidang atau menghapus bidang opsional, dan memeriksa kompatibilitas terhadap semua versi skema sebelumnya.
+ *FULL*: Pilihan kompatibilitas ini memungkinkan konsumen untuk membaca data yang ditulis oleh produsen dengan menggunakan skema versi sebelumnya atau versi berikutnya, tetapi tidak versi yang lebih awal atau versi yang lebih baru. Anda dapat menggunakan pilihan ini untuk memeriksa kompatibilitas terhadap versi skema terakhir ketika Anda menambahkan atau menghapus bidang opsional.
+ *FULL\$1ALL*: Pilihan kompatibilitas ini memungkinkan konsumen untuk membaca data yang ditulis oleh produsen dengan menggunakan semua versi skema sebelumnya. Anda dapat menggunakan pilihan ini untuk memeriksa kompatibilitas terhadap semua versi skema sebelumnya saat Anda menambahkan atau menghapus bidang opsional.

## Pustaka Serde sumber terbuka
<a name="schema-registry-serde-libraries"></a>

AWS menyediakan pustaka Serde open-source sebagai kerangka kerja untuk membuat serial dan deserialisasi data. Desain sumber terbuka dari perpustakaan ini memungkinkan aplikasi dan kerangka kerja sumber terbuka umum untuk mendukung perpustakaan ini dalam proyek mereka.

Untuk detail lebih lanjut tentang bagaimana perpustakaan Serde bekerja, lihat [Cara kerja registri skema](schema-registry-works.md).

## Kuota Registri Skema
<a name="schema-registry-quotas"></a>

Kuota, juga disebut sebagai batas dalam AWS, adalah nilai maksimum untuk sumber daya, tindakan, dan item di AWS akun Anda. Berikut ini adalah batas lunak untuk Registri Skema di AWS Glue.

**Pasangan nilai kunci metadata versi skema**  
Anda dapat memiliki hingga 10 pasangan nilai kunci SchemaVersion per AWS Wilayah.

Anda dapat melihat atau mengatur pasangan metadata kunci-nilai menggunakan atau. [QuerySchemaVersionMetadata tindakan (Python: query\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-QuerySchemaVersionMetadata) [PutSchemaVersionMetadata tindakan (Python: put\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-PutSchemaVersionMetadata) APIs

Berikut ini adalah batasan sulit untuk Schema Registry diAWS Glue.

**Pendaftar**  
Anda dapat memiliki hingga 100 pendaftar per AWS Wilayah untuk akun ini.

**SchemaVersion**  
Anda dapat memiliki hingga 10000 versi skema per AWS Wilayah untuk akun ini.

Setiap skema baru membuat versi skema baru, sehingga Anda secara teoritis dapat memiliki hingga 10000 skema per akun per wilayah, jika setiap skema hanya memiliki satu versi.

**Beban skema**  
Ada batas ukuran 170KB untuk beban skema.

# Cara kerja registri skema
<a name="schema-registry-works"></a>

Bagian ini menjelaskan bagaimana proses serialisasi dan deserialisasi dalam registri skema bekerja.

1. Mendaftar skema: Jika skema belum ada di registri, maka skema dapat didaftarkan dengan nama skema yang sama dengan nama tujuan (misalnya, test\$1topic, test\$1stream, prod\$1firehose) atau produsen dapat memberikan nama kustom untuk skema tersebut. Produsen juga dapat menambahkan pasangan kunci-nilai ke skema sebagai metadata, seperti sumber: MSK\$1KAFKA\$1TOPIC\$1A, atau menerapkan tag ke skema pada pembuatan skema. AWS Setelah sebuah skema terdaftar, Registri Skema mengembalikan skema ID versi ke pen-serialisasi. Jika skema ada tetapi pen-serialisasi menggunakan versi baru yang tidak ada, maka Registri Skema akan memeriksa referensi skema untuk aturan kompatibilitas untuk memastikan bahwa versi baru sudah kompatibel sebelum mendaftarkannya sebagai sebuah versi baru.

   Ada dua metode untuk mendaftarkan sebuah skema: registrasi manual dan registrasi otomatis. Anda dapat mendaftarkan sebuah skema secara manual melalui konsol AWS Glue atau CLI/SDK.

   Ketika pendaftaran otomatis diaktifkan dalam pengaturan serializer, pendaftaran otomatis skema akan dilakukan. Jika `REGISTRY_NAME` tidak disediakan dalam konfigurasi produsen, maka pendaftaran otomatis akan mendaftarkan versi skema baru di bawah registri default (default-registry). Lihat [Instalasi SerDe Perpustakaan](schema-registry-gs-serde.md) untuk informasi tentang menentukan properti pendaftaran otomatis.

1. Serializer memvalidasi catatan data terhadap skema: Ketika aplikasi yang memproduksi data telah terdaftar skemanya, serializer Registri Skema memvalidasi rekaman yang dihasilkan oleh aplikasi terstruktur dengan bidang dan jenis data yang cocok dengan skema terdaftar. Jika skema catatan tidak cocok dengan skema terdaftar, maka serializer akan mengembalikan pengecualian dan aplikasi akan gagal untuk memberikan catatan ke tujuan. 

   Jika tidak ada skema yang ada dan jika nama skema tidak disediakan melalui konfigurasi produsen, maka skema akan dibuat dengan nama yang sama dengan nama topik (jika Apache Kafka atau Amazon MSK) atau nama pengaliran (jika Kinesis Data Streams).

   Setiap catatan memiliki sebuah definisi skema dan data. Definisi skema tersebut di-kueri terhadap skema dan versi yang ada di Registri Skema.

   Secara default, produsen cache definisi skema dan versi IDs skema skema terdaftar. Jika definisi versi skema catatan tidak cocok dengan apa yang tersedia dalam cache, maka produsen akan mencoba untuk memvalidasi skema dengan Registri Skema. Jika versi skema valid, maka ID versi dan definisi skema akan di-cache secara lokal pada produsen.

   Anda dapat menyesuaikan periode cache default (24 jam) dalam properti produsen opsional di langkah \$13 dari [Instalasi SerDe Perpustakaan](schema-registry-gs-serde.md).

1. Serialisasi dan kirimkan catatan: Jika rekaman sesuai dengan skema, serializer menghiasi setiap rekaman dengan ID versi skema, membuat serial rekaman berdasarkan format data yang dipilih (AVRO, JSON, Protobuf, atau format lain segera hadir), mengompres catatan (konfigurasi produsen opsional), dan mengirimkannya ke tujuan.

1. Konsumen melakukan deserialisasi data: Konsumen membaca data ini menggunakan perpustakaan deserializer Registri Skema yang mem-parsing ID versi skema dari muatan catatan.

1. Deserializer dapat meminta skema dari Registri Skema: Jika ini adalah pertama kalinya deserializer melihat catatan dengan ID versi skema tertentu, dengan menggunakan ID versi skema, deserializer akan meminta skema dari Registri Skema dan meng-cache skema tersebut secara lokal pada konsumen. Jika Registri Skema tidak dapat melakukan deserialisasi catatan, konsumen dapat mencatat log data dari catatan dan melanjutkan, atau menghentikan aplikasi.

1. Deserializer menggunakan skema untuk melakukan deserialisasi catatan: Ketika deserializer mengambil ID versi skema dari Registri Skema, deserializer melakukan dekompresi catatan (jika catatan yang dikirim oleh produsen dikompresi) dan menggunakan skema tersebut untuk melakukan deserialisasi catatan. Aplikasi sekarang memproses catatan tersebut.

**catatan**  
Enkripsi: Klien Anda berkomunikasi dengan Registri Skema melalui panggilan API yang mengenkripsi data dalam transit dengan menggunakan enkripsi TLS melalui HTTPS. Skema yang disimpan dalam Schema Registry selalu dienkripsi saat istirahat menggunakan kunci AWS Key Management Service service-managed ().AWS KMS

**catatan**  
Otorisasi Pengguna: Registri Skema mendukung kebijakan IAM berbasis identitas.

# Memulai dengan registri skema
<a name="schema-registry-gs"></a>

Bagian berikut memberikan gambaran umum dan memandu Anda dalam menyiapkan dan menggunakan Registri Skema. Untuk informasi tentang konsep dan komponen registri skema, lihat[AWS GlueRegistri skema](schema-registry.md).

**Topics**
+ [Instalasi SerDe Perpustakaan](schema-registry-gs-serde.md)
+ [Mengintegrasikan dengan Registri Skema AWS Glue](schema-registry-integrations.md)
+ [Migrasi dari registri skema pihak ketiga ke AWS Glue Schema Registry](schema-registry-integrations-migration.md)

# Instalasi SerDe Perpustakaan
<a name="schema-registry-gs-serde"></a>

 SerDe Pustaka menyediakan kerangka kerja untuk serialisasi dan deserialisasi data. 

Anda akan menginstal serializer sumber terbuka untuk aplikasi Anda yang menghasilkan data (secara kolektif “"-serializer"). Serializer menangani serialisasi, kompresi, dan interaksi dengan Registri Skema. Serializer secara otomatis mengekstrak skema dari catatan yang ditulis ke tujuan yang kompatibel dengan Registri Skema, seperti Amazon MSK. Demikian juga, Anda akan menginstal deserializer sumber terbuka pada aplikasi Anda yang mengkonsumsi data.

# Implementasi Java
<a name="schema-registry-gs-serde-java"></a>

**catatan**  
Prasyarat: Sebelum menyelesaikan langkah-langkah berikut, Anda harus memiliki sebuah klaster Amazon Managed Streaming for Apache Kafka (Amazon MSK) atau Apache Kafka yang berjalan. Produsen dan konsumen Anda harus berjalan pada Java 8 atau yang lebih tinggi.

Untuk menginstal perpustakaan pada produsen dan konsumen:

1. Di dalam file pom.xml baik dari produsen dan konsumen, tambahkan dependensi ini melalui kode di bawah ini:

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   Atau, Anda dapat mengkloning [Repositori Github Registri Skema AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

1. Siapkan produsen Anda dengan properti yang diperlukan ini:

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   Jika tidak ada skema yang ada, maka pendaftaran otomatis harus Anda aktifkan (langkah berikutnya). Jika Anda memiliki skema yang ingin Anda terapkan, maka ganti "my-schema" dengan nama skema Anda. Selain itu, "registry-name" harus disediakan jika skema pendaftaran otomatis dimatikan. Jika skema dibuat menggunakan "default-registry", maka nama registri dapat dihilangkan.

1. (Opsional) Mengatur salah satu properti produsen opsional ini. Untuk deskripsi properti terperinci, lihat [ ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   Pendaftaran otomatis mendaftarkan versi skema dengan menggunakan registri default ("default-registry"). Jika `SCHEMA_NAME` tidak ditentukan dalam langkah sebelumnya, maka nama topik disimpulkan sebagai `SCHEMA_NAME`. 

   Lihat [Pembuatan versi dan kompatibilitas skema](schema-registry.md#schema-registry-compatibility) untuk informasi lebih lanjut tentang mode kompatibilitas.

1. Siapkan konsumen Anda dengan properti yang diperlukan berikut ini:

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Wilayah AWS
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (Opsional) Mengatur properti konsumen opsional ini. Untuk deskripsi properti terperinci, lihat [ ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 Implementasi
<a name="schema-registry-gs-serde-csharp"></a>

**catatan**  
Prasyarat: Sebelum menyelesaikan langkah-langkah berikut, Anda harus memiliki sebuah klaster Amazon Managed Streaming for Apache Kafka (Amazon MSK) atau Apache Kafka yang berjalan. Produsen dan konsumen Anda harus berjalan di .NET 8.0 atau lebih tinggi.

## Penginstalan
<a name="schema-registry-gs-serde-csharp-install"></a>

Untuk aplikasi C \$1, instal SerDe NuGet paket AWS Glue Schema Registry menggunakan salah satu metode berikut:

**.NET CLI:**  
Gunakan perintah berikut untuk menginstal paket:

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

di mana `<rid>` bisa`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` atau `1.0.0-linux-arm64`

**PackageReference (dalam file.csproj Anda):**  
Tambahkan yang berikut ini ke file proyek Anda:

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

di mana `<rid>` bisa`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` atau `1.0.0-linux-arm64`

## Pengaturan File Konfigurasi
<a name="schema-registry-gs-serde-csharp-config"></a>

Buat file properti konfigurasi (misalnya,`gsr-config.properties`) dengan pengaturan yang diperlukan:

**Konfigurasi Minimal:**  
Berikut ini menunjukkan contoh konfigurasi minimal:

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Menggunakan pustaka klien C\$1 Glue Schema untuk Kafka SerDes
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**Contoh penggunaan serializer:**  
Contoh berikut menunjukkan cara menggunakan serializer:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**Contoh penggunaan deserializer:**  
Contoh berikut menunjukkan cara menggunakan deserializer:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## Menggunakan pustaka klien C\$1 Glue Schema dengan for KafkaFlow SerDes
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**Contoh penggunaan serializer:**  
Contoh berikut menunjukkan cara mengkonfigurasi KafkaFlow dengan serializer:

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**Contoh penggunaan deserializer:**  
Contoh berikut menunjukkan cara mengkonfigurasi KafkaFlow dengan deserializer:

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## Properti Produsen Opsional
<a name="schema-registry-gs-serde-csharp-optional"></a>

Anda dapat memperluas file konfigurasi Anda dengan properti opsional tambahan:

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## Format Data yang Didukung
<a name="schema-registry-gs-serde-supported-formats"></a>

Implementasi Java dan C\$1 mendukung format data yang sama:
+ *AVRO: Format* biner Apache Avro
+ *JSON: Format Skema* JSON
+ *PROTOBUF: Format Buffer* Protokol

## Catatan
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ Untuk memulai dengan perpustakaan, silakan kunjungi [https://www.nuget. org/packages/AWS.Lem. SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ Kode sumber tersedia di: [https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# Membuat registri
<a name="schema-registry-gs3"></a>

Anda dapat menggunakan registri default atau membuat registri baru sebanyak yang diperlukan menggunakan AWS Glue APIs atau AWS Glue konsol.

**AWS Glue APIs**  
Anda dapat menggunakan langkah-langkah ini untuk melakukan tugas ini menggunakan AWS Glue APIs.

Untuk menggunakan AWS CLI untuk AWS Glue Schema Registry APIs, pastikan untuk memperbarui versi Anda AWS CLI ke versi terbaru.

 Untuk menambahkan sebuah registri baru, gunakan API [CreateRegistry tindakan (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry). Menentukan `RegistryName` sebagai nama registri yang akan dibuat, dengan panjang maksimal 255 karakter, hanya berisi huruf, angka, tanda hubung, garis bawah, tanda dolar, atau tanda hash. 

Tentukan `Description` sebagai string yang panjangnya tidak lebih dari 2048 byte, cocok dengan pola string [multi-baris alamat URI](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns). 

Opsional, menentukan satu atau beberapa `Tags` untuk registri Anda, sebagai sebuah susunan peta pasangan nilai kunci.

```
aws glue create-registry --registry-name registryName1 --description description
```

Ketika registri Anda sudah dibuat, Amazon Resource Name (ARN) ditetapkan untuknya, yang dapat Anda lihat di `RegistryArn` dari respons API. Setelah Anda membuat sebuah registri, buatlah satu atau beberapa skema untuk registri tersebut.

**Konsol AWS Glue**  
Untuk menambahkan sebuah registri baru di konsol AWS Glue:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Registri Skema**.

1. Pilih **Tambah registri**.

1. Masukkan sebuah **Nama Registri** untuk registri tersebut, yang terdiri dari huruf, angka, tanda hubung, atau garis bawah. Nama ini tidak dapat diubah.

1. Masukkan **Deskripsi** (opsional) untuk registri.

1. Opsional, terapkan satu atau beberapa tag ke registri Anda. Pilih **Tambahkan tag** dan tentukan **Kunci tag** dan **Nilai tag** opsional.

1. Pilih **Tambah registri**.

![\[Contoh membuat sebuah registri.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/schema_reg_create_registry.png)


Ketika registri Anda dibuat, Amazon Resource Name (ARN) ditetappkan padanya, yang dapat Anda lihat dengan memilih registri dari daftar di **Registri Skema**. Setelah Anda membuat sebuah registri, buatlah satu atau beberapa skema untuk registri tersebut.

# Berurusan dengan catatan tertentu (JAVA POJO) untuk JSON
<a name="schema-registry-gs-json-java-pojo"></a>

Anda dapat menggunakan sebuah plain old Java object (POJO) dan memberikan objek tersebut sebagai catatan. Ini mirip dengan maksud dari catatan spesifik di AVRO. [mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema)Dapat menghasilkan skema JSON untuk POJO yang dilewati. Perpustakaan ini juga dapat memasukkan informasi tambahan dalam skema JSON.

Perpustakaan Registri Skema AWS Glue menggunakan bidang "ClassName" yang dimasukkan dalam skema untuk memberikan nama kelas terklasifikasi sepenuhnya. Bidang "ClassName" digunakan oleh deserializer untuk melakukan deserialisasi ke objek dari kelas itu.

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# Membuat skema
<a name="schema-registry-gs4"></a>

Anda dapat membuat skema menggunakan AWS Glue APIs atau AWS Glue konsol. 

**AWS Glue APIs**  
Anda dapat menggunakan langkah-langkah ini untuk melakukan tugas ini menggunakan AWS Glue APIs.

Untuk menambahkan sebuah skema baru, gunakan API [CreateSchema tindakan (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema).

Menentukan sebuah struktur `RegistryId` untuk menunjukkan registri untuk skema tersebut. Atau, menghilangkan `RegistryId` untuk menggunakan registri default.

Menentukan `SchemaName` yang terdiri dari huruf, angka, tanda hubung, atau garis bawah, dan `DataFormat` sebagai **AVRO** atau **JSON**. `DataFormat` setelah ditetapkan pada skema tidak dapat diubah.

Menentukan mode `Compatibility`:
+ *Backward (disarankan)* — Konsumen dapat membaca versi saat ini dan sebelumnya.
+ *Backward all* — Konsumen dapat membaca versi terkini dan semua versi sebelumnya.
+ *Forward* — Konsumen dapat membaca baik versi saat ini dan versi berikutnya.
+ *Forward all* — Konsumen dapat membaca baik versi saat ini dan semua versi berikutnya.
+ *Full* — Kombinasi dari Backward dan Forward.
+ *Full all* — Kombinasi dari Backward all dan Forward all.
+ *None* — Tidak ada pemeriksaan kompatibilitas yang dilakukan.
+ *Disabled* — Mencegah versioning apapun untuk skema ini.

Opsional, tentukan `Tags` untuk skema Anda. 

Tentukan a `SchemaDefinition` untuk menentukan skema dalam format data Avro, JSON, atau Protobuf. Lihat contoh-contohnya.

Untuk format data Avro:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

Untuk format data JSON:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Untuk format data Protobuf:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**Konsol AWS Glue**  
Untuk menambahkan sebuah skema baru menggunakan konsol AWS Glue:

1. Masuk ke Konsol AWS Manajemen dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Skema**.

1. Pilih **Tambahkan skema**.

1. Masukkan **Nama skema**, terdiri dari huruf, angka, tanda hubung, garis bawah, tanda dolar, atau tanda hash. Nama ini tidak dapat diubah.

1. Pilih **Registri** di mana skema akan disimpan dari menu drop-down. Registri induk tidak dapat diubah setelah dibuat.

1. Biarkan **Format data** sebagai *Apache Avro* atau *JSON*. Format ini berlaku untuk semua versi skema ini.

1. Pilih **Mode kompatibilitas**.
   + *Backward (disarankan)* — Penerima dapat membaca versi saat ini dan sebelumnya.
   + *Backward all* — Penerima dapat membaca versi terkini dan semua versi sebelumnya.
   + *Forward* — Pengirim dapat menulis baik versi terkini dan versi sebelumnya.
   + *Forward all* — Pengirim dapat menulis versi terkini dan semua versi sebelumnya.
   + *Full* — Kombinasi dari Backward dan Forward.
   + *Full all* — Kombinasi dari Backward All dan Forward All.
   + *None* — Tidak ada pemeriksaan kompatibilitas yang dilakukan.
   + *Disabled* — Mencegah versioning apapun untuk skema ini.

1. Masukkan **Deskripsi** opsional untuk registri yang terdiri hingga 250 karakter.  
![\[Contoh membuat sebuah skema.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/schema_reg_create_schema.png)

1. Opsional, terapkan satu atau beberapa tag untuk skema Anda. Pilih **Tambahkan tag** dan tentukan **Kunci tag** dan **Nilai tag** opsional.

1. Pada kotak **Versi skema pertama**, masukkan atau tempel skema awal Anda.

   Untuk format Avro, lihat [Bekerja dengan format data Avro](#schema-registry-avro)

   Untuk format JSON, lihat [Bekerja dengan format data JSON](#schema-registry-json)

1. Opsional, pilih **Tambahkan metadata** untuk menambahkan metadata versi untuk membuat anotasi atau mengklasifikasikan versi skema Anda.

1. Pilih **Buat skema dan versi**.

![\[Contoh membuat sebuah skema.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/schema_reg_create_schema2.png)


Skema dibuat dan muncul dalam daftar pada **Skema**.

## Bekerja dengan format data Avro
<a name="schema-registry-avro"></a>

Avro menyediakan layanan serialisasi data dan pertukaran data. Avro menyimpan definisi data dalam format JSON sehingga mudah untuk dibaca dan ditafsirkan. Data itu sendiri disimpan dalam format biner.

Untuk informasi tentang cara mendefinisikan skema Apache Avro, lihat [Spesifikasi Apache Avro](http://avro.apache.org/docs/current/spec.html).

## Bekerja dengan format data JSON
<a name="schema-registry-json"></a>

Data dapat diserialisasi dengan format JSON. [Format Skema JSON](https://json-schema.org/) mendefinisikan standar untuk format Skema JSON.

# Memperbarui skema atau registri
<a name="schema-registry-gs5"></a>

Setelah dibuat Anda dapat mengedit skema Anda, versi skema, atau registri Anda.

## Memperbarui registri
<a name="schema-registry-gs5a"></a>

Anda dapat memperbarui registri menggunakan AWS Glue APIs atau AWS Glue konsol. Nama registri yang sudah ada tidak dapat Anda edit. Anda dapat mengedit deskripsi untuk sebuah registri.

**AWS Glue APIs**  
Untuk memperbarui registri yang sudah ada, gunakan API [UpdateRegistry tindakan (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry).

Menentukan sebuah struktur `RegistryId` untuk menunjukkan registri yang ingin Anda perbarui. Berikan sebuah `Description` untuk mengubah deskripsi untuk sebuah registri.

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**Konsol AWS Glue**  
Untuk memperbarui sebuah registri menggunakan konsol AWS Glue:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Registri Skema**.

1. Pilih sebuah registri dari daftar registri, dengan mencentang kotaknya.

1. Di menu **Tindakan**, pilih **Edit registri**.

# Memperbarui skema
<a name="schema-registry-gs5b"></a>

Anda dapat memperbarui pengaturan deskripsi atau kompatibilitas untuk sebuah skema.

Untuk memperbarui skema yang sudah ada, gunakan API [UpdateSchema tindakan (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema).

Menentukan sebuah struktur `SchemaId` untuk menunjukkan skema yang ingin Anda perbarui. Salah satu dari `VersionNumber` atau `Compatibility` harus disediakan.

Contoh kode 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# Menambahkan versi skema
<a name="schema-registry-gs5c"></a>

Ketika Anda menambahkan sebuah versi skema, Anda akan perlu untuk membandingkan versi untuk memastikan skema baru akan diterima.

Untuk menambahkan versi baru ke sebuah skema yang ada, gunakan API [RegisterSchemaVersion tindakan (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion).

Menentukan sebuah struktur `SchemaId` untuk menunjukkan skema yang Anda ingin tambah versinya, dan sebuah `SchemaDefinition` untuk menentukan skemanya.

Contoh kode 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Skema**.

1. Pilih skema dari daftar skema, dengan mencentang kotaknya.

1. Pilih satu atau beberapa skema dari daftar, dengan mencentang kotak.

1. Pada menu **Tindakan**, pilih **Daftarkan versi baru**.

1. Pada kotak **Versi baru**, masukkan atau tempel skema baru Anda.

1. Pilih **Bandingkan dengan versi sebelumnya** untuk melihat perbedaan dengan versi skema sebelumnya.

1. Opsional, pilih **Tambahkan metadata** untuk menambahkan metadata versi untuk membuat anotasi atau mengklasifikasikan versi skema Anda. Masukkan **Kunci** dan **Nilai** opsional.

1. Pilih **Versi Registri**.

![\[Menambahkan sebuah versi skema.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/schema_reg_add_schema_version.png)


Versi skema muncul pada daftar versi. Jika versi mengubah mode kompatibilitas, maka versi akan ditandai sebagai sebuah pos pemeriksaan.

## Contoh perbandingan versi skema
<a name="schema-registry-gs5c1"></a>

Ketika Anda memilih untuk **Bandingkan dengan versi sebelumnya**, maka Anda akan melihat versi sebelumnya dan versi baru ditampilkan bersama-sama. Informasi yang diubah akan disorot sebagai berikut:
+ *Kuning*: menunjukkan ada perubahan informasi.
+ *Hijau*: menunjukkan ada penambahan konten dalam versi terbaru.
+ *Merah*: menunjukkan ada penghapusan konten dalam versi terbaru.

Anda juga dapat membandingkan dengan versi yang sebelumnya.

![\[Contoh perbandingan sebuah versi skema.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/schema_reg_version_comparison.png)


# Menghapus skema atau registri
<a name="schema-registry-gs7"></a>

Menghapus sebuah skema, versi skema, atau registri adalah tindakan permanen yang tidak dapat dibatalkan.

## Menghapus skema
<a name="schema-registry-gs7a"></a>

Anda mungkin ingin menghapus skema ketika tidak lagi digunakan dalam registri, menggunakan Konsol Manajemen AWS, atau [DeleteSchema tindakan (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API.

Menghapus satu atau beberapa skema adalah tindakan permanen yang tidak dapat dibatalkan. Pastikan bahwa skema atau skema-skema itu tidak lagi diperlukan.

Untuk menghapus sebuah skema dari registri, panggil API [DeleteSchema tindakan (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema), dengan menentukan struktur `SchemaId` untuk mengidentifikasi skema.

Contoh:

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**Konsol AWS Glue**  
Untuk menghapus sebuah skema dari konsol AWS Glue:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Registri Skema**.

1. Pilih registri yang berisi skema Anda dari daftar registri.

1. Pilih satu atau beberapa skema dari daftar, dengan mencentang kotak.

1. Di menu **Tindakan**, pilih **Hapus skema**.

1. Masukkan teks **Delete** di bidang untuk mengonfirmasi penghapusan.

1. Pilih **Hapus**.

Skema yang Anda tentukan akan dihapus dari registri.

## Menghapus versi skema
<a name="schema-registry-gs7b"></a>

Saat skema terakumulasi dalam registri, Anda mungkin ingin menghapus versi skema yang tidak diinginkan menggunakan Konsol Manajemen AWS, atau API. [DeleteSchemaVersions tindakan (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) Menghapus satu atau beberapa versi skema adalah tindakan permanen yang tidak dapat dibatalkan. Pastikan bahwa versi skema tersebut tidak diperlukan lagi.

Ketika menghapus versi skema, perhatikan batasan-batasan berikut:
+ Anda tidak dapat menghapus versi yang dikenai pemeriksaan.
+ Rentang versi berdekatan tidak boleh lebih dari 25.
+ Versi skema terbaru tidak boleh dalam status tertunda.

Menentukan struktur `SchemaId` untuk mengidentifikasi skema, dan menentukan `Versions` sebagai rentang versi yang akan dihapus. Untuk informasi lebih lanjut tentang menentukan versi atau rentang versi, lihat [DeleteRegistry tindakan (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Versi skema yang Anda tentukan akan dihapus dari registri.

Memanggil API [ListSchemaVersions tindakan (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) setelah panggilan ini akan mencantumkan status versi yang dihapus.

Contoh:

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Registri Skema**.

1. Pilih registri yang berisi skema Anda dari daftar registri.

1. Pilih satu atau beberapa skema dari daftar, dengan mencentang kotak.

1. Di menu **Tindakan**, pilih **Hapus skema**.

1. Masukkan teks **Delete** di bidang untuk mengonfirmasi penghapusan.

1. Pilih **Hapus**.

Versi skema yang Anda tentukan akan dihapus dari registri.

# Menghapus registri
<a name="schema-registry-gs7c"></a>

Anda mungkin ingin menghapus sebuah registri ketika skema yang ada di dalamnya tidak lagi diatur berdasarkan registri itu. Anda harus menetapkan kembali skema tersebut ke registri yang lain.

Menghapus satu atau beberapa registri adalah tindakan permanen yang tidak dapat dibatalkan. Pastikan bahwa registri atau registri-registri tersebut tidak lagi diperlukan.

Registri default dapat dihapus menggunakan AWS CLI.

**API AWS Glue**  
Untuk menghapus registri secara keseluruhan termasuk skema dan semua versinya, panggil API [DeleteRegistry tindakan (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Menentukan sebuah struktur `RegistryId` untuk mengidentifikasi registri.

Contoh:

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

Untuk mendapatkan status operasi hapus, Anda dapat memanggil API `GetRegistry` setelah panggilan asinkron.

**Konsol AWS Glue**  
Untuk menghapus sebuah registri dari konsol AWS Glue:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Registri Skema**.

1. Pilih sebuah registri dari daftar, dengan mencentang kotaknya.

1. Di menu **Tindakan**, pilih **Hapus registri**.

1. Masukkan teks **Delete** di bidang untuk mengonfirmasi penghapusan.

1. Pilih **Hapus**.

Registri yang Anda pilih akan dihapus dari AWS Glue.

## Contoh IAM untuk serializer
<a name="schema-registry-gs1"></a>

**catatan**  
AWS kebijakan terkelola memberikan izin yang diperlukan untuk kasus penggunaan umum. Untuk informasi tentang menggunakan kebijakan terkelola untuk mengelola Registri Skema, lihat [AWS kebijakan terkelola (standar) untuk AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed). 

Untuk serializer, Anda harus membuat sebuah kebijakan yang paling tidak serupa dengan yang di bawah ini untuk memberikan Anda kemampuan untuk menemukan `schemaVersionId` untuk definisi skema yang diberikan. Catatan, Anda harus memiliki izin baca pada registri agar dapat membaca skema dalam registri tersebut. Anda dapat membatasi registri yang dapat dibaca dengan menggunakan klausul `Resource`.

Contoh kode 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

Selanjutnya, Anda juga dapat memungkinkan produsen untuk membuat skema dan versi baru dengan memasukkan metode tambahan berikut. Catatan, Anda harus dapat memeriksa registri untuk skema add/remove/evolve di dalamnya. Anda dapat membatasi registri yang dapat diperiksa dengan menggunakan klausul `Resource`.

Contoh kode 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## Contoh IAM untuk deserializer
<a name="schema-registry-gs1b"></a>

Untuk deserializer (sisi konsumen), Anda harus membuat kebijakan yang serupa dengan yang di bawah ini untuk memungkinkan deserializer mengambil skema dari Registri Skema untuk melakukan deserialisasi. Catatan, Anda harus dapat memeriksa registri agar dapat mengambil skema di dalamnya.

Contoh kode 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## Konektivitas pribadi menggunakan AWS PrivateLink
<a name="schema-registry-gs-private"></a>

Anda dapat menggunakan AWS PrivateLink untuk menghubungkan VPC produsen data Anda AWS Glue dengan mendefinisikan titik akhir VPC antarmuka untuk. AWS Glue Ketika Anda menggunakan titik akhir antarmuka VPC, komunikasi antara VPC dan AWS Glue dilakukan sepenuhnya di jaringan AWS . Untuk informasi selengkapnya, lihat [Menggunakan AWS Glue dengan Titik Akhir VPC](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html).

# Mengakses metrik Amazon CloudWatch
<a name="schema-registry-gs-monitoring"></a>

 CloudWatch Metrik Amazon tersedia sebagai bagian dari CloudWatch tingkat gratis. Anda dapat mengakses metrik ini di CloudWatch konsol. Metrik tingkat API meliputi CreateSchema (Sukses dan Latensi),, (Sukses dan Latensi) GetSchemaByDefinition, (Sukses dan Latensi), GetSchemaVersion (Sukses dan Latensi), RegisterSchemaVersion (Sukses dan Latensi). PutSchemaVersionMetadata Metrik tingkat sumber daya termasuk Registry. ThrottledByLimit, SchemaVersion. ThrottledByLimit, SchemaVersion .Ukuran.

# Contoh CloudFormation template untuk registri skema
<a name="schema-registry-integrations-cfn"></a>

Berikut ini adalah contoh template untuk membuat sumber Schema Registry di CloudFormation. Untuk membuat tumpukan ini di akun Anda, salin templat di atas ke dalam file `SampleTemplate.yaml`, dan jalankan perintah berikut:

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

Contoh ini menggunakan `AWS::Glue::Registry` untuk membuat registri, `AWS::Glue::Schema` untuk membuat skema, `AWS::Glue::SchemaVersion` untuk membuat versi skema, dan `AWS::Glue::SchemaVersionMetadata` untuk mengisi metadata versi skema. 

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# Mengintegrasikan dengan Registri Skema AWS Glue
<a name="schema-registry-integrations"></a>

Bagian ini menjelaskan integrasi dengan registri AWS Glue skema. Contoh-contoh dalam bagian ini menunjukkan skema dengan format data AVRO. Untuk contoh lainnya, termasuk skema dengan format data JSON, lihat tes integrasi dan ReadMe informasi dalam repositori [open source AWS Glue Schema Registry](https://github.com/awslabs/aws-glue-schema-registry).

**Topics**
+ [Kasus penggunaan: Menghubungkan Registri Skema ke Amazon MSK atau Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [Kasus penggunaan: Mengintegrasikan Amazon Kinesis Data Streams dengan Registri Skema AWS Glue](#schema-registry-integrations-kds)
+ [Kasus penggunaan: Amazon Managed Service untuk Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [Kasus Penggunaan: Integrasi dengan AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [Kasus penggunaan: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [Kasus penggunaan: AWS Glue streaming](#schema-registry-integrations-aws-glue-streaming)
+ [Kasus penggunaan: Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## Kasus penggunaan: Menghubungkan Registri Skema ke Amazon MSK atau Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

Mari kita anggap Anda sedang menulis data ke topik Apache Kafka, dan Anda dapat mengikuti langkah-langkah untuk memulai.

1. Buat cluster Amazon Managed Streaming for Apache Kafka (Amazon MSK) atau Apache Kafka dengan setidaknya satu topik. Jika membuat sebuah klaster Amazon MSK, maka Anda dapat menggunakan Konsol Manajemen AWS. Ikuti instruksi berikut: [Mulai Menggunakan Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) di *Panduan Developer Amazon Managed Streaming for Apache Kafka*.

1. Ikuti langkah-langkah di atas [Instalasi SerDe Perpustakaan](schema-registry-gs-serde.md).

1. Untuk membuat skema registri, skema, atau skema versi, ikuti petunjuk pada bagian [Memulai dengan registri skema](schema-registry-gs.md) dalam dokumen ini.

1. Mulai produsen dan konsumen Anda untuk menggunakan Schema Registry untuk menulis dan membaca catatan to/from topik Amazon MSK atau Apache Kafka. Contoh kode produsen dan konsumen dapat ditemukan dalam [ ReadMe file dari](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) pustaka Serde. Perpustakaan Registri Skema pada produsen akan secara otomatis melakukan serialisasi pada catatan dan menghias catatan dengan ID versi skema.

1. Jika skema dari catatan ini telah diinput, atau jika pendaftaran otomatis telah diaktifkan, maka skema akan telah terdaftar dalam Registri Skema.

1. Pembacaan konsumen dari topik Amazon MSK atau Apache Kafka, menggunakan perpustakaan Registri Skema AWS Glue, secara otomatis akan mencari skema dari Registri Skema.

## Kasus penggunaan: Mengintegrasikan Amazon Kinesis Data Streams dengan Registri Skema AWS Glue
<a name="schema-registry-integrations-kds"></a>

Integrasi ini mengharuskan Anda memiliki pengaliran data Amazon Kinesis Data Streams yang sudah ada. Untuk informasi selengkapnya, lihat [Memulai Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html) dalam *Panduan Developer Amazon Kinesis Data Streams*.

Ada dua cara untuk berinteraksi dengan data dalam pengaliran data Kinesis Data Streams.
+ Melalui perpustakaan Kinesis Producer Library (KPL) dan Kinesis Client Library (KCL) di Java. Support multi-bahasa tidak tersedia.
+ Melalui`PutRecords`,`PutRecord`, dan `GetRecords` Kinesis APIs Data Streams yang tersedia di. AWS SDK untuk Java

Jika saat ini Anda menggunakan KPL/KCL pustaka, kami sarankan untuk terus menggunakan metode itu. Ada versi KCL dan KPL yang diperbarui dengan Registri Skema terintegrasi, seperti yang ditunjukkan dalam contoh. Jika tidak, Anda dapat menggunakan kode sampel untuk memanfaatkan AWS Glue Schema Registry jika menggunakan KDS secara langsung. APIs 

Integrasi Registri Skema ini hanya tersedia dengan KPL v0.14.2 atau yang lebih baru dan dengan KCL v2.3 atau yang lebih baru. Integrasi Registri Skema dengan format data JSON tersedia dengan KPL v0.14.8 atau yang lebih baru dan dengan KCL v2.3.6 atau yang lebih baru.

### Berinteraksi dengan Data Menggunakan Kinesis SDK V2
<a name="schema-registry-integrations-kds-sdk-v2"></a>

Bagian ini menjelaskan cara berinteraksi dengan Kinesis menggunakan Kinesis SDK V2

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### Berinteraksi dengan data menggunakan pustaka KPL/KCL
<a name="schema-registry-integrations-kds-libraries"></a>

Bagian ini menjelaskan integrasi Kinesis Data Streams dengan Schema Registry menggunakan pustaka. KPL/KCL Untuk informasi selengkapnya tentang KPL/KCL, lihat [Mengembangkan Produsen Menggunakan Amazon Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) dalam *Panduan Developer Amazon Kinesis Data Streams*.

#### Menyiapkan Registri Skema di KPL
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. Menentukan definisi skema untuk data, format data dan nama skema yang ditulis dalam Registri Skema AWS Glue.

1. Mengkonfigurasi objek `GlueSchemaRegistryConfiguration`, opsional.

1. Berikan objek skema ke `addUserRecord API`.

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Menyiapkan pustaka klien Kinesis
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Anda akan mengembangkan konsumen Perpustakaan Klien Kinesis di Java. Untuk informasi selengkapnya tentang KCL, lihat [Mengembangkan Konsumen Kinesis Client Librarydi Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) dalam *Panduan Developer Amazon Kinesis Data Streams*.

1. Buat sebuah instans `GlueSchemaRegistryDeserializer` dengan memberikan sebuah objek `GlueSchemaRegistryConfiguration`.

1. Berikan `GlueSchemaRegistryDeserializer` ke `retrievalConfig.glueSchemaRegistryDeserializer`.

1. Mengakses skema pesan masuk dengan memanggil `kinesisClientRecord.getSchema()`.

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Berinteraksi dengan data menggunakan Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis"></a>

Bagian ini menjelaskan integrasi Kinesis Data Streams dengan Schema Registry menggunakan Kinesis Data Streams. APIs

1. Memperbarui dependensi Maven ini:

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. Dalam produsen, tambahkan informasi header skema menggunakan `PutRecords` atau API `PutRecord` di Kinesis Data Streams.

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. Di produsen, gunakan `PutRecords` atau API `PutRecord` untuk menempatkan catatan ke dalam aliran data.

1. Dalam konsumen, hapus catatan skema dari header, dan lakukan serialisasi pada catatan skema Avro.

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Berinteraksi dengan data menggunakan Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis-reference"></a>

Berikut ini adalah contoh kode untuk menggunakan `PutRecords` dan `GetRecords` APIs.

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Kasus penggunaan: Amazon Managed Service untuk Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink adalah sebuah kerangka kerja sumber terbuka populer dan mesin pengolahan terdistribusi untuk komputasi stateful atas aliran data yang tak terbatas dan dibatasi. Amazon Managed Service untuk Apache Flink adalah AWS layanan terkelola penuh yang memungkinkan Anda membangun dan mengelola aplikasi Apache Flink untuk memproses data streaming.

Apache Flink sumber terbuka menyediakan sejumlah sumber dan sink. Sebagai contoh, sumber data yang telah ditetapkan termasuk membaca dari file, direktori, dan soket, dan menyerap data dari koleksi dan iterator. DataStream Konektor Apache Flink menyediakan kode untuk Apache Flink untuk berinteraksi dengan berbagai sistem pihak ketiga, seperti Apache Kafka atau Kinesis sebagai sumber tenggelam. and/or 

Untuk informasi lebih lanjut, lihat [Panduan Developer Amazon Kinesis Data Analytics](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html).

### Konektor Apache Flink Kafka
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink menyediakan sebuah konektor aliran data Apache Kafka untuk membaca data dari dan menulis data untuk topik Kafka dengan jaminan persis-satu-kali. Konsumen Kafka Flink,`FlinkKafkaConsumer`, menyediakan akses untuk membaca dari satu atau lebih topik Kafka. Produsen Kafka dari Apache Flink, `FlinkKafkaProducer`, memungkinkan menulis aliran catatan untuk satu atau beberapa topik Kafka. Untuk informasi lebih lanjut, lihat [Konektor Apache Kafka](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html).

### Konektor aliran Kinesis Apache Flink
<a name="schema-registry-integrations-kinesis-connector"></a>

Konektor pengaliran data Kinesis menyediakan akses ke Amazon Kinesis Data Streams. `FlinkKinesisConsumer`Ini adalah sumber data streaming paralel persis sekali yang berlangganan beberapa aliran Kinesis dalam wilayah layanan yang AWS sama, dan dapat secara transparan menangani re-sharding aliran saat pekerjaan sedang berjalan. Setiap subtugas konsumen bertanggung jawab untuk mengambil catatan data dari beberapa serpihan Kinesis. Jumlah serpihan yang diambil oleh setiap subtugas akan berubah karena serpihan ditutup dan dibuat oleh Kinesis. `FlinkKinesisProducer` menggunakan Kinesis Producer Library (KPL) untuk memasukkan data dari pengaliran Apache Flink ke pengaliran Kinesis. Untuk informasi selengkapnya, lihat [Konektor Amazon Kinesis Streams](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html).

Untuk informasi lebih lanjut, lihat [Repositori GitHub Skema AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

### Mengintegrasikan dengan Apache Flink
<a name="schema-registry-integrations-apache-flink-integrate"></a>

 SerDes Perpustakaan yang disediakan dengan Schema Registry terintegrasi dengan Apache Flink. Untuk bekerja dengan Apache Flink, Anda diharuskan untuk menerapkan antarmuka [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) dan [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) yang disebut `GlueSchemaRegistryAvroSerializationSchema` dan `GlueSchemaRegistryAvroDeserializationSchema`, yang dapat Anda hubungkan ke konektor Apache Flink.

### Menambahkan ketergantungan AWS Glue Schema Registry ke dalam aplikasi Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Untuk menyiapkan dependensi integrasi ke Registri Skema AWS Glue dalam aplikasi Apache Flink:

1. Tambahkan dependensi ke file `pom.xml` Anda.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Mengintegrasikan Kafka atau Amazon MSK dengan Apache Flink
<a name="schema-registry-integrations-kda-integrate-msk"></a>

Anda dapat menggunakan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kafka sebagai sumber atau Kafka sebagai wastafel.

**Kafka sebagai sumber**  
Diagram berikut menunjukkan integrasi Kinesis Data Streams dengan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kafka sebagai sumber.

![\[Kafka sebagai sebuah sumber.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka sebagai sebuah sink**  
Diagram berikut menunjukkan integrasi Kinesis Data Streams dengan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kafka sebagai wastafel.

![\[Kafka sebagai sebuah sink.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/gsr-kafka-sink.png)


Untuk mengintegrasikan Kafka (atau Amazon MSK) dengan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kafka sebagai sumber atau Kafka sebagai wastafel, buat perubahan kode di bawah ini. Tambahkan blok kode ditebalkan untuk kode Anda masing-masing di bagian analog.

Jika Kafka adalah sumbernya, maka gunakan kode deserializer (blok 2). Jika Kafka adalah sink-nya, maka gunakan kode serializer (blok 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Mengintegrasikan Kinesis Data Streams dengan Apache Flink
<a name="schema-registry-integrations-integrate-kds"></a>

Anda dapat menggunakan Managed Service for Apache Flink untuk Apache Flink dengan Kinesis Data Streams sebagai sumber atau wastafel.

**Kinesis Data Streams sebagai sebuah sumber**  
Diagram berikut menunjukkan integrasi Kinesis Data Streams dengan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kinesis Data Streams sebagai sumber.

![\[Kinesis Data Streams sebagai sebuah sumber.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams sebagai sebuah sink**  
Diagram berikut menunjukkan integrasi Kinesis Data Streams dengan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kinesis Data Streams sebagai wastafel.

![\[Kinesis Data Streams sebagai sebuah sink.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/gsr-kinesis-sink.png)


Untuk mengintegrasikan Kinesis Data Streams dengan Managed Service untuk Apache Flink untuk Apache Flink, dengan Kinesis Data Streams sebagai sumber atau Kinesis Data Streams sebagai sink, buat perubahan kode di bawah ini. Tambahkan blok kode ditebalkan untuk kode Anda masing-masing di bagian analog.

Jika Kinesis Data Streams adalah sumbernya, maka gunakan kode deserializer (blok 2). Jika Kinesis Data Streams adalah sink-nya, maka gunakan kode serializer (blok 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## Kasus Penggunaan: Integrasi dengan AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

[Untuk menggunakan AWS Lambda fungsi sebagai konsumen Kafka/Amazon MSK Apache dan deserialisasi pesan yang disandikan Avro-menggunakan AWS Glue Schema Registry, kunjungi halaman MSK Labs.](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)

## Kasus penggunaan: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

Tabel AWS Glue mendukung skema yang Anda dapat tentukan secara manual atau dengan me-referensi ke Registri Skema AWS Glue. Registri Skema terintegrasi dengan Katalog Data untuk memungkinkan Anda untuk secara opsional menggunakan skema yang disimpan dalam Registri Skema saat membuat atau memperbarui tabel atau partisi AWS Glue dalam Katalog Data. Untuk mengidentifikasi sebuah definisi skema dalam Registri Skema, minimal, Anda perlu mengetahui ARN dari skema yang ia menjadi bagiannya. Sebuah versi skema dari sebuah skema, yang berisi definisi skema, dapat direferensikan oleh UUID atau versi nomor. Selalu ada satu versi skema, yakni versi "terbaru", yang dapat dicari tanpa mengetahui nomor versi atau UUID.

Ketika memanggil operasi `CreateTable` atau `UpdateTable`, Anda akan memberikan sebuah struktur `TableInput` yang berisi `StorageDescriptor`, yang mungkin memiliki sebuah `SchemaReference` ke skema yang sudah ada di Registri Skema. Demikian pula, ketika Anda memanggil `GetTable` or `GetPartition` APIs, respons mungkin berisi skema dan. `SchemaReference` Ketika sebuah tabel atau partisi dibuat menggunakan sebuah referensi skema, Katalog Data akan mencoba untuk mengambil skema tersebut untuk referensi skema ini. Jika ia tidak dapat menemukan skema di Registri Skema, maka ia akan mengembalikan sebuah skema kosong di respons `GetTable`; jika tidak, responsnya akan memiliki skema dan referensi skema.

Anda juga dapat melakukan tindakan dari konsol AWS Glue.

Untuk melakukan operasi ini dan membuat, memperbarui, atau melihat informasi skema, Anda harus memberikan peran IAM kepada pengguna panggilan yang memberikan izin untuk API. `GetSchemaVersion`

### Menambahkan tabel atau memperbarui skema untuk tabel
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

Menambahkan sebuah tabel baru dari skema yang ada mengikat tabel ke versi skema tertentu. Setelah versi skema baru didaftarkan, Anda dapat memperbarui definisi tabel ini dari halaman Lihat tabel di konsol AWS Glue atau menggunakan API [UpdateTable tindakan (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable).

#### Menambahkan tabel dari skema yang ada
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

Anda dapat membuat sebuah tabel AWS Glue dari versi skema dalam registri dengan menggunakan konsol AWS Glue atau API `CreateTable`.

**API AWS Glue**  
Ketika memanggil API `CreateTable`, Anda akan memberikan sebuah `TableInput` yang berisi `StorageDescriptor` yang memiliki `SchemaReference` ke sebuah skema yang sudah ada di Registri Skema.

**Konsol AWS Glue**  
Untuk membuat sebuah tabel dari konsol AWS Glue:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Tabel**.

1. Di menu **Tambahkan Tabel**, pilih **Tambahkan tabel dari skema yang ada**.

1. Mengkonfigurasi properti tabel dan penyimpanan data sebagaimana dalam Panduan Developer AWS Glue.

1. Di halaman **Pilih skema Glue**, pilih **Registri** tempat skema berada.

1. Pilih **Nama skema** dan pilih **Versi** skema yang akan diterapkan.

1. Tinjau pratinjau skema, dan pilih **Selanjutnya**.

1. Tinjau dan buat tabel.

Skema dan versi yang diterapkan ke tabel muncul di kolom **Skema Glue** dalam daftar tabel. Anda dapat melihat tabel tersebut untuk melihat lebih detail.

#### Memperbarui skema untuk tabel
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

Ketika sebuah versi skema baru tersedia, Anda mungkin ingin memperbarui skema tabel menggunakan API [UpdateTable tindakan (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) atau konsol AWS Glue. 

**penting**  
Ketika memperbarui skema untuk sebuah tabel yang sudah ada yang memiliki skema AWS Glue yang ditentukan secara manual, skema baru yang direferensikan dalam Registri Skema mungkin tidak kompatibel. Hal ini dapat menyebabkan tugas Anda gagal.

**API AWS Glue**  
Ketika memanggil API `UpdateTable`, Anda akan memberikan sebuah `TableInput` yang berisi `StorageDescriptor` yang memiliki `SchemaReference` ke sebuah skema yang sudah ada di Registri Skema.

**Konsol AWS Glue**  
Untuk memperbarui skema untuk sebuah tabel dari konsol AWS Glue:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Di panel navigasi, pada **Katalog data**, pilih **Tabel**.

1. Melihat tabel dari daftar tabel.

1. Klik **Perbarui skema** di kotak yang memberitahu Anda tentang versi baru.

1. Tinjau perbedaan antara skema saat ini dan skema baru.

1. Pilih **Tampilkan semua perbedaan skema** untuk melihat detail lebih lanjut.

1. Pilih **Simpan tabel** untuk menyetujui versi baru.

## Kasus penggunaan: AWS Glue streaming
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Gluestreaming mengkonsumsi data dari sumber streaming dan melakukan operasi ETL sebelum menulis ke sink output. Sumber streaming input dapat ditentukan menggunakan Tabel Data atau langsung dengan menentukan konfigurasi sumber.

AWS Gluestreaming mendukung tabel Katalog Data untuk sumber streaming yang dibuat dengan skema yang ada di Registri AWS Glue Skema. Anda dapat membuat skema di AWS Glue Schema Registry dan membuat AWS Glue tabel dengan sumber streaming menggunakan skema ini. AWS GlueTabel ini dapat digunakan sebagai input ke pekerjaan AWS Glue streaming untuk deserialisasi data dalam aliran input.

Satu hal yang perlu diperhatikan di sini adalah ketika AWS Glue skema di Registri Skema berubah, Anda perlu memulai ulang pekerjaan AWS Glue streaming yang perlu mencerminkan perubahan dalam skema.

## Kasus penggunaan: Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

API Apache Kafka Streams adalah sebuah perpustakaan klien untuk memproses dan menganalisis data yang disimpan di Apache Kafka. Bagian ini menjelaskan integrasi Apache Kafka Streams dengan Registri Skema AWS Glue, yang memungkinkan Anda untuk mengelola dan menegakkan skema pada aplikasi streaming data Anda. Untuk informasi lebih lanjut tentang Apache Kafka Streams, lihat [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).

### Integrasi dengan Library SerDes
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

Ada sebuah kelas `GlueSchemaRegistryKafkaStreamsSerde` yang dapat Anda konfigurasikan dengan sebuah aplikasi Streams.

#### Kode contoh aplikasi Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

Untuk menggunakan Registri Skema AWS Glue dalam aplikasi Apache Kafka Streams:

1. Konfigurasikan aplikasi Kafka Streams.

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. Buat sebuah pengaliran dari topik avro-input.

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. Proses catatan data (contoh memfilter catatan-catatanyang nilai dari favorite\$1color adalah merah muda atau di mana nilainya adalah 15).

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. Tulis hasilnya kembali ke topik avro-output.

   ```
   result.to("avro-output");
   ```

1. Mulai aplikasi Apache Kafka Streams.

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### Hasil implementasi
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

Hasil ini menunjukkan proses penyaringan catatan yang disaring dalam langkah 3 sebagai favorite\$1color "merah muda" atau nilai "15,0".

Catatan sebelum penyaringan:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

Catatan setelah penyaringan:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### Kasus penggunaan: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

Integrasi Apache Kafka Connect dengan Registri Skema AWS Glue memungkinkan Anda untuk mendapatkan informasi skema dari konektor. Konverter Apache Kafka menentukan format data dalam Apache Kafka dan bagaimana menerjemahkannya ke data Apache Kafka Connect. Setiap pengguna Apache Kafka Connect akan diharuskan mengkonfigurasi konverter ini berdasarkan format data yang mereka inginkan di saat dimuat dari atau disimpan ke Apache Kafka. Dengan cara ini, Anda dapat menentukan konverter Anda sendiri untuk menerjemahkan data Apache Kafka Connect ke dalam jenis data yang digunakan dalam Registri Skema AWS Glue (misalnya: Avro) dan memanfaatkan serializer kami untuk mendaftarkan skemanya dan melakukan serialisasi. Kemudian konverter juga dapat menggunakan deserializer kami untuk melakukan deserialisasi data yang diterima dari Apache Kafka dan mengubahnya kembali ke data Apache Kafka Connect. Contoh diagram alur kerja ditunjukkan di bawah ini.

![\[Alur kerja Apache Kafka Connect.\]](http://docs.aws.amazon.com/id_id/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. Menginstal proyek `aws-glue-schema-registry` dengan mengkloning [Repositori Github untuk Registri Skema AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. Jika Anda berencana untuk menggunakan Apache Kafka Connect dalam Mode *Standalone*, maka perbarui **connect-standalone.properties** dengan menggunakan langkah-langkah dalam petunjuk di bawah ini. Jika Anda berencana menggunakan Apache Kafka Connect dalam mode *Distributed*, perbarui **connect-avro-distributed.properties menggunakan instruksi** yang sama.

   1. Tambahkan properti ini juga ke file properti connect Apache Kafka:

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. Tambahkan perintah di bawah ini ke bagian **Mode peluncuran** di **kafka-run-classbawah.sh**:

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. Tambahkan perintah di bawah ini ke bagian **Mode peluncuran** di **kafka-run-classbawah.sh**

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   Seharusnya terlihat seperti ini:

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. Jika menggunakan bash, jalankan perintah di bawah ini untuk menyiapkan CLASSPATH Anda di bash\$1profile Anda. Untuk shell yang lain, perbarui lingkungannya sesuai dengan itu.

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (Opsional) Jika Anda ingin menguji dengan sebuah sumber file sederhana, maka lakukan kloning pada konektor sumber file.

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. Pada konfigurasi konektor sumber, edit format data ke Avro, pembaca file ke `AvroFileReader` dan perbarui contoh objek Avro dari path file yang Anda gunakan untuk membacanya. Contoh:

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. Instal konektor sumber.

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. Perbarui properti sink pada `<your Apache Kafka installation directory>/config/connect-file-sink.properties`, perbarui nama topik dan nama file yang keluar.

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Mulai Konektor Sumber (dalam contoh ini, ia adalah konektor sumber file).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Jalankan Konektor Sink (dalam contoh ini ia adalah konektor sink file).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Untuk contoh penggunaan Kafka Connect, lihat run-local-tests skrip.sh di bawah folder integration-tests di repositori [Github](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests) untuk Schema Registry. AWS Glue

# Migrasi dari registri skema pihak ketiga ke AWS Glue Schema Registry
<a name="schema-registry-integrations-migration"></a>

Migrasi dari registri skema pihak ketiga ke Registri Skema AWS Glue yang memiliki dependensi pada registri skema pihak ketiga yang ada saat ini. Jika ada catatan dalam topik Apache Kafka yang dikirim menggunakan sebuah registri skema pihak ketiga, maka konsumen memerlukan registri skema pihak ketiga tersebut untuk melakukan deserialisasi pada catatan tersebut. `AWSKafkaAvroDeserializer` menyediakan kemampuan untuk menentukan kelas deserializer sekunder yang mengarahkan ke deserializer pihak ketiga dan digunakan untuk melakukan deserialisasi pada catatan-catatan tersebut.

Ada dua kriteria untuk mempensiunkan skema pihak ketiga. Pertama, pensiun dapat terjadi hanya setelah catatan dalam topik Apache Kafka yang menggunakan registri skema pihak ketiga yang tidak lagi diperlukan oleh dan untuk konsumen manapun. Kedua, pensiun dapat terjadi dengan habisnya usia dari topik Apache Kafka, tergantung pada periode retensi yang ditentukan untuk topik-topik tersebut. Perhatikan bahwa jika Anda memiliki topik yang memiliki retensi tak terbatas, maka Anda masih dapat bermigrasi ke Registri Skema AWS Glue tetapi Anda tidak akan dapat mempensiunkan registri skema pihak ketiga. Solusinya, Anda dapat menggunakan aplikasi atau Mirror Maker 2 untuk membaca dari topik saat ini dan menghasilkan topik baru dengan Registri Skema AWS Glue.

Untuk bermigrasi dari registri skema pihak ketiga ke Registri Skema AWS Glue:

1. Buat registri di Registri Skema AWS Glue, atau gunakan registri default.

1. Hentikan konsumennya. Modifikasi sehingga menyertakan Registri Skema AWS Glue sebagai deserializer utama, dan registri skema pihak ketiga sebagai yang sekunder. 
   + Mengatur properti konsumen. Dalam contoh ini, secondary\$1deserializer diatur ke sebuah deserializer yang berbeda. Perilakunya adalah sebagai berikut: konsumen mengambil catatan dari Amazon MSK dan pertama kali mencoba untuk menggunakan `AWSKafkaAvroDeserializer`. Jika tidak dapat membaca byte ajaib yang berisi ID Skema Avro untuk skema dari Registri Skema AWS Glue, maka `AWSKafkaAvroDeserializer` akan mencoba untuk menggunakan kelas deserializer yang disediakan di secondary\$1deserializer. Properti spesifik untuk deserializer sekunder juga perlu disediakan dalam properti konsumen, seperti schema\$1registry\$1url\$1config dan specific\$1avro\$1reader\$1config, seperti yang ditunjukkan di bawah ini.

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. Mulai ulang konsumen.

1. Hentikan produsen dan arahkan produsen ke Registri Skema AWS Glue.

   1. Mengatur properti produsen. Dalam contoh ini, produsen akan menggunakan default-registry dan versi skema register otomatis.

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (Opsional) Secara manual, pindahkan skema yang ada dan versi skema dari registri skema pihak ketiga saat ini ke Registri Skema AWS Glue, baik untuk default-registry di Registri Skema AWS Glue atau registri non-default tertentu di Registri Skema AWS Glue. Hal ini dapat dilakukan dengan mengekspor skema dari pendaftar skema pihak ketiga dalam format JSON dan membuat skema baru di Schema Registry menggunakan atau. AWS Glue Konsol Manajemen AWS AWS CLI

    Langkah ini mungkin penting jika Anda perlu mengaktifkan pemeriksaan kompatibilitas dengan versi skema sebelumnya untuk versi skema yang baru dibuat menggunakan AWS CLI dan Konsol Manajemen AWS, atau ketika produsen mengirim pesan dengan skema baru dengan pendaftaran otomatis versi skema diaktifkan.

1. Mulai produsen.