

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Glue 結構描述登錄檔
<a name="schema-registry"></a>

**注意**  
AWS Glue AWS Glue 主控台中的下列區域不支援結構描述登錄檔：中東 （阿拉伯聯合大公國）。

AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展資料串流結構描述。*結構描述*定義資料記錄的結構和格式。搭配 AWS Glue 結構描述登錄檔，您可以使用方便的 Apache Kafka、[Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/)、[Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/)、[Amazon Managed Service for Apache Flink](https://aws.amazon.com/kinesis/data-analytics/) 和 [AWS Lambda](https://aws.amazon.com/lambda/) 整合，在資料串流應用程式上管理和強制執行結構描述。

[結構描述登錄檔支援 AVRO (v1.11.4) 資料格式、JSON 資料格式 (具有 ](https://json-schema.org/)JSON 結構描述格式[ (適用於結構描述 (規格 Draft-04、Draft-06 和 Draft-07)) 並使用 ](https://github.com/everit-org/json-schema)Everit 程式庫`extensions`進行 JSON 結構描述驗證)，Protocol Buffers (Protobuf) 版本 proto2 和 proto3 不支援 或 `groups`，支援 Java 語言，且即將推出對其他資料格式和語言的支援。支援的功能包括相容性、透過中繼資料取得結構描述、自動註冊結構描述、IAM 相容性，以及選用的 ZLIB 壓縮，以減少儲存和資料傳輸。結構描述登錄檔是無伺服器且可免費使用。

使用結構描述作為生產者與消費者之間的資料格式合約，可改善資料控管、更高品質的資料，並讓資料消費者能夠彈性相容上游變更。

結構描述登錄檔允許不同的系統共用序列化和還原序列化的結構描述。例如，假設您擁有資料的生產者和消費者。生產者在發佈資料時知道結構描述。結構描述登錄檔提供某些系統，如 Amazon MSK 或 Apache Kafka 的序列化程式和還原序列化程式。

 如需詳細資訊，請參閱[結構描述登錄檔的運作方式](schema-registry-works.md)。

**Topics**
+ [結構描述](#schema-registry-schemas)
+ [登錄檔](#schema-registry-registries)
+ [結構描述版本控制和相容性](#schema-registry-compatibility)
+ [開源 Serde 程式庫](#schema-registry-serde-libraries)
+ [結構描述登錄檔的配額](#schema-registry-quotas)
+ [結構描述登錄檔的運作方式](schema-registry-works.md)
+ [結構描述登錄檔入門](schema-registry-gs.md)

## 結構描述
<a name="schema-registry-schemas"></a>

*結構描述*定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。

在 Avro 的這個範例結構描述中，格式和結構由配置和欄位名稱定義，欄位名稱的格式由資料類型定義 (例如，`string`、`int`)。

```
{
    "type": "record",
    "namespace": "ABC_Organization",
    "name": "Employee",
    "fields": [
        {
            "name": "Name",
            "type": "string"
        },
        {
            "name": "Age",
            "type": "int"
        },
        {
            "name": "address",
            "type": {
                "type": "record",
                "name": "addressRecord",
                "fields": [
                    {
                        "name": "street",
                        "type": "string"
                    },
                    {
                        "name": "zipcode",
                        "type": "int" 
                    }
                ]
            }
        }
    ]
}
```

在 JSON Schema Draft-07 for JSON 此範例中，格式是由 [JSON 結構描述組織](https://json-schema.org/)所定義。

```
{
	"$id": "https://example.com/person.schema.json",
	"$schema": "http://json-schema.org/draft-07/schema#",
	"title": "Person",
	"type": "object",
	"properties": {
		"firstName": {
			"type": "string",
			"description": "The person's first name."
		},
		"lastName": {
			"type": "string",
			"description": "The person's last name."
		},
		"age": {
			"description": "Age in years which must be equal to or greater than zero.",
			"type": "integer",
			"minimum": 0
		}
	}
}
```

在這個 Protobuf 範例中，格式由[協定緩衝區語言的版本 2 (proto2)](https://developers.google.com/protocol-buffers/docs/reference/proto2-spec) 定義。

```
syntax = "proto2";

package tutorial;

option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";

message Person {
  optional string name = 1;
  optional int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    optional string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}
```

## 登錄檔
<a name="schema-registry-registries"></a>

*登錄檔*是結構描述的邏輯容器。登錄檔允許您組織您的結構描述，以及管理應用程式的存取控制。登錄檔具有 Amazon Resource Name (ARN)，可讓您組織和設定登錄檔中結構描述操作的不同存取許可。

您可以使用預設登錄檔，或視需要建立許多新的登錄檔。


**AWS Glue 結構描述登錄檔階層**  

|  | 
| --- |
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/schema-registry.html)  | 

## 結構描述版本控制和相容性
<a name="schema-registry-compatibility"></a>

每個結構描述可以有多個版本。版本控制是由套用在結構描述上的相容性規則所控制。結構描述登錄檔會根據此規則檢查登錄新結構描述版本的要求，然後才能成功。

標示為檢查點的結構描述版本是用來判斷註冊新結構描述版本的相容性。第一次建立結構描述時，預設檢查點將是第一個版本。當結構描述隨著更多版本發展，您可以使用 CLI/SDK 將檢查點變更為使用 `UpdateSchema` API，它遵循一組約束。在主控台中，編輯結構描述定義或相容性模式預設會將檢查點變更為最新版本。

相容性模式可讓您控制結構描述可以或不可以隨著時間變化的方式。這些模式形成應用程式產生和使用資料之間的合約。當新版的結構描述提交至登錄檔時，會使用套用至結構描述名稱的相容性規則來判斷是否可以接受新版本。共有 8 種相容性模式：NONE、DISABLED、BACKWARD、BACKWARD\$1ALL、FORWARD、FORWARD\$1ALL、FULL、FULL\$1ALL。

在 Avro 資料格式中，欄位可以是選用的或必要的。選用欄位是其中 `Type` 包含 null。必要欄位的 `Type` 沒有 null。

在 Protobuf 資料格式中，proto2 語法中的欄位可以是選用 (包括重複) 或必要欄位，而 proto3 語法中的所有欄位均是選用 (包括重複) 欄位。所有相容性規則均是根據對協定緩衝區規範的理解以及 [Google 協定緩衝區文件](https://developers.google.com/protocol-buffers/docs/overview#updating)中的指導所決定。
+ *NONE*：未套用相容性模式。您可以在開發案例中使用此選項，或者如果您不知道要套用至結構描述的相容性模式。任何新增的版本都會被接受，而不經過相容性檢查。
+ *DISABLED*：此相容性選項可防止特定結構描述的版本控制。無法新增新版本。
+ *BACKWARD*：建議使用此相容性選項，因為它允許消費者讀取目前和先前的結構描述版本。當您刪除欄位或新增選用欄位時，您可以使用此選項來檢查與先前結構描述版本的相容性。一個典型的 BACKWARD 使用案例是，當您的應用程式為最新的結構描述而建立時。

**AVRO**  
例如，假設您有一個由名字 (必要)、姓氏 (必要)、電子郵件 (必要) 和電話號碼 (選用) 所定義的結構描述。

  如果您的下一個結構描述版本移除必要的電子郵件欄位，將可成功註冊。BACKWARD 相容性要求消費者能夠讀取目前和先前的結構描述版本。您的消費者將能夠讀取新的結構描述，因為舊訊息中的額外電子郵件欄位被忽略。

  如果您有新增必要欄位 (例如郵遞區號) 的提議新結構描述版本，則不會成功註冊 BACKWARD 相容性。新版本的消費者將無法在結構描述變更之前讀取舊訊息，因為他們缺少必要的郵遞區號欄位。但是，如果在新的結構描述中將郵遞區號欄位設定為選用，則提議的版本將成功註冊，因為消費者可以在沒有選用郵遞區碼欄位的情況下讀取舊的結構描述。

**JSON**  
例如，假設您有一個由名字 (選用)、姓氏 (選用)、電子郵件 (選用) 和電話號碼 (選用) 所定義的結構描述版本。

  如果您的下一個結構描述版本新增了選用的電話號碼屬性，只要原始結構描述版本不允許透過將 `additionalProperties` 欄位設定為 false 的任何其他屬性，這就會成功註冊。BACKWARD 相容性要求消費者能夠讀取目前和先前的結構描述版本。您的消費者將能夠讀取電話號碼屬性不存在的原始結構描述產生的資料。

  如果您有新增選用電話號碼屬性的提議新結構描述版本，當原始結構描述版本將 `additionalProperties` 欄位設定為 true 時 (即允許任何其他屬性)，這將無法成功註冊 BACKWARD 相容性。新版本的消費者將無法在結構描述變更之前讀取舊訊息，因為他們無法讀取具有不同類型 (例如是字串而不是數字) 之電話號碼屬性的資料。

**PROTOBUF**  
例如，假設您有一個由訊息 `Person` 與 proto2 語法下的 `first name` (必要)、`last name` (必要)、`email` (必要) 和 `phone number` (選用) 等欄位所定義的結構描述版本。

  與 AVRO 案例類似，如果您的下一個結構描述版本移除必要的 `email` 欄位，該版本可成功註冊。BACKWARD 相容性要求消費者能夠讀取目前和先前的結構描述版本。舊訊息中的額外 `email` 欄位會被忽略，您的消費者將能夠讀取新的結構描述。

  如果您有新增了必要欄位 (例如 `zip code`) 的提議新結構描述版本，根據 BACKWARD 相容性規則，該版本無法成功註冊。新版本的消費者將無法讀取結構描述變更之前的舊訊息，因為它們缺少必要的 `zip code` 欄位。但是，如果在新的結構描述中將 `zip code` 欄位設定為選用，則提議的版本可成功註冊，因為消費者可以讀取不含 `zip code` 選用欄位的結構描述。

  在 GrPC 使用案例中，新增新 RPC 服務或 RPC 方法是向後相容的變更。例如，假設您有一個由 RPC 服務 `MyService` 使用 `Foo` 和 `Bar` 兩種 RPC 方法所定義的結構描述版本。

  如果您的下一個結構描述版本新增了名為 `Baz` 的新 RPC 方法，該版本可成功註冊。因為新加入的 RPC 方法 `Baz` 為可選方法，根據 BACKWARD 相容性規則，您的消費者將能夠讀取原始結構描述產生的資料。

  如果您有移除了現有 RPC 方法 `Foo` 的提議新結構描述版本，根據 BACKWARD 相容性規則，該版本無法成功註冊。新版本的消費者將無法讀取結構描述變更之前的舊訊息，因為他們無法使用 gRPC 應用程式中不存在的 RPC 方法 `Foo` 來理解和讀取資料。
+ *BACKWARD\$1ALL*：此相容性選項可讓消費者同時讀取目前和所有先前的結構描述版本。當您刪除欄位或新增選用欄位時，您可以使用此選項來檢查所有先前結構描述版本的相容性。
+ *FORWARD*：此相容性選項可讓消費者同時讀取目前和後續的結構描述版本，但不一定是較新的版本。當您新增欄位或刪除選用欄位時，您可以使用此選項來檢查與上一個結構描述版本的相容性。FORWARD 的典型使用案例是您的應用程式已經為之前的結構描述建立，並且應該能夠處理更新的結構描述。

**AVRO**  
例如，假設您有一個由名字 (必要)、姓氏 (必要)、電子郵件 (選用) 定義的結構描述版本。

  如果您有一個新的結構描述版本新增了必要欄位，例如電話號碼，該版本可成功註冊。FORWARD 相容性要求消費者能夠使用舊版本來讀取以新結構描述產生的資料。

  如果您有刪除了必要名字欄位的提議結構描述版本，根據 FORWARD 相容性規則，該版本無法成功註冊。您在以前版本上的消費者將無法讀取提議的結構描述，因為他們缺少必要的名字欄位。但是，如果名字欄位最初是選用的，那麼提議的新結構描述將成功註冊，因為消費者可以根據沒有選用名字欄位的新結構描述讀取資料。

**JSON**  
例如，假設您有一個由名字 (選用)、姓氏 (選用)、電子郵件 (選用) 和電話號碼 (選用) 所定義的結構描述版本。

  如果您有移除選用電話號碼屬性的新結構描述版本，只要新結構描述版本不允許透過將 `additionalProperties` 欄位設定為 false 的任何其他屬性，這就會成功註冊。FORWARD 相容性要求消費者能夠使用舊版本來讀取以新結構描述產生的資料。

  如果您有刪除選用電話號碼屬性的提議結構描述版本，則當新的結構描述版本將 `additionalProperties` 欄位設定為 true，即允許任何額外的屬性，這不會成功註冊 FORWARD 相容性。舊版本的消費者將無法讀取提議的結構描述，因為他們可能具有不同類型的電話號碼屬性，例如是字串而不是數字。

**PROTOBUF**  
例如，假設您有一個由訊息 `Person` 與 proto2 語法下的 `first name` (必要)、`last name` (必要) 和 `email` (選用) 等欄位所定義的結構描述版本。

  與 AVRO 案例類似，如果您有一個新的結構描述版本新增了必要欄位，例如 `phone number`，該版本可成功註冊。FORWARD 相容性要求消費者能夠使用舊版本來讀取以新結構描述產生的資料。

  如果您有刪除了必要 `first name` 欄位的提議結構描述版本，根據 FORWARD 相容性規則，該版本無法成功註冊。您在以前版本上的消費者將無法讀取提議的結構描述，因為它們缺少必要的 `first name` 欄位。但是，如果 `first name` 欄位最初是選用欄位，則提議的新結構描述可成功註冊，因為消費者可以讀取沒有 `first name` 選用欄位的新結構描述資料。

  在 gRPC 使用案例中，移除 RPC 服務或 RPC 方法是向前相容的變更。例如，假設您有一個由 RPC 服務 `MyService` 使用 `Foo` 和 `Bar` 兩種 RPC 方法所定義的結構描述版本。

  如果您的下一個結構描述版本刪除了名為 `Foo` 的現有 RPC 方法，根據 FORWARD 相容性規則，該版本可成功註冊，因為消費者可以使用舊版本來讀取新結構描述產生的資料。如果您有新增了 RPC 方法 `Baz` 的提議新結構描述版本，根據 FORWARD 相容性規則，該版本無法成功註冊。您以前版本上的消費者將無法讀取提議的結構描述，因為它們缺少 RPC 方法 `Baz`。
+ *FORWARD\$1ALL*：此相容性選項可讓消費者讀取任何新註冊結構描述的生產者所寫入的資料。當您需要新增欄位或刪除選用欄位，並檢查與所有先前結構描述版本的相容性時，您可以使用此選項。
+ *FULL*：此相容性選項可讓消費者讀取使用前一個或下一個版本的結構描述，但不能讀取更舊或更新版本的生產者所寫入的資料。當您新增或移除選用欄位時，您可以使用此選項來檢查與上一個結構描述版本的相容性。
+ *FULL\$1ALL*：此相容性選項可讓消費者讀取生產者使用所有先前的結構描述版本所寫入的資料。當您新增或移除選用欄位時，您可以使用此選項來檢查所有先前結構描述版本的相容性。

## 開源 Serde 程式庫
<a name="schema-registry-serde-libraries"></a>

AWS 提供開放原始碼 Serde 程式庫做為序列化和還原序列化資料的架構。這些程式庫的開源設計允許通用的開源應用程式和架構在他們的專案中支援這些程式庫。

如需 Serde 函式庫如何運作的詳細資訊，請參閱[結構描述登錄檔的運作方式](schema-registry-works.md)。

## 結構描述登錄檔的配額
<a name="schema-registry-quotas"></a>

配額也稱為 中的限制 AWS，是 AWS 帳戶中資源、動作和項目的最大值。下列是 AWS Glue 中結構描述登錄檔的軟性限制。

**結構描述版本中繼資料索引鍵-值對**  
每個 AWS 區域每個 SchemaVersion 最多可以有 10 個鍵值對。

您可以使用 [QuerySchemaVersionMetadata 動作 (Python: query\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-QuerySchemaVersionMetadata) 或 [PutSchemaVersionMetadata 動作 (Python: put\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-PutSchemaVersionMetadata) API 檢視或設定索引鍵值中繼資料配對。

下列是 AWS Glue 中結構描述登錄檔的硬性限制。

**登錄檔**  
此帳戶每個 AWS 區域最多可以有 100 個登錄檔。

**SchemaVersion**  
此帳戶每個 AWS 區域最多可以有 10000 個結構描述版本。

每個新結構描述都會建立新的結構描述版本，因此如果每個結構描述只有一個版本，理論上每個區域每個帳戶最多可以有 10000 個結構描述。

**結構描述承載**  
結構描述裝載的大小限制為 170 KB。

# 結構描述登錄檔的運作方式
<a name="schema-registry-works"></a>

本節說明結構描述登錄檔中的序列化和還原序列化處理程序的運作方式。

1. 註冊結構描述：如果結構描述不存在於登錄檔中，則可以使用等於目的地名稱的結構描述名稱來註冊結構描述 (例如，test\$1topic、test\$1stream、prod\$1firehose)，或者生產者可以提供結構描述的自訂名稱。生產者也可以將索引鍵/值對新增至結構描述做為中繼資料，例如來源：msk\$1kafka\$1topic\$1A，或在結構描述建立時將 AWS 標籤套用至結構描述。結構描述註冊後，結構描述登錄檔會將結構描述版本 ID 傳回至序列化程式。如果結構描述存在，但序列化程式正在使用不存在的新版本，結構描述登錄檔會檢查結構描述參考相容性規則，以確保新版本是相容性的，然後再將它註冊為新版本。

   註冊結構描述的方法有兩種：手動註冊和自動註冊。您可以透過 AWS Glue 主控台或 CLI/SDK 手動註冊結構描述。

   當序列化程式設定中開啟自動註冊時，將執行結構描述的自動註冊。如果在生產者組態中沒有提供 `REGISTRY_NAME`，則自動註冊將在預設登錄檔 (default-registry) 下註冊新的結構描述版本。請參閱[安裝 SerDe 程式庫](schema-registry-gs-serde.md)，以取得有關指定自動註冊屬性的資訊。

1. 序列化程式會針對結構描述驗證資料記錄：當產生資料的應用程式已註冊其結構描述時，結構描述登錄檔序列化程式會驗證應用程式所產生的記錄是以符合已註冊的結構描述的欄位和資料類型構成。如果記錄的結構描述不符合註冊的結構描述，序列化程式將傳回例外狀況，應用程式將無法將記錄傳遞到目的地。

   如果沒有結構描述存在，且結構描述名稱未透過生產者組態提供，則結構描述會以與主題名稱 (如果是 Apache Kafka 或 Amazon MSK) 或串流名稱 (如果是 Kinesis Data Streams) 相同的名稱建立結構描述。

   每個記錄都有結構描述定義和資料。結構描述定義會根據結構描述登錄檔中的現有結構描述和版本進行查詢。

   依預設，生產者快取已註冊結構描述的結構描述定義和結構描述版本 ID。如果記錄的結構描述版本定義不符合快取中的可用內容，生產者將嘗試使用結構描述登錄檔來驗證結構描述。如果結構描述版本有效，則其版本 ID 和定義將在生產者本機快取。

   您可以在[安裝 SerDe 程式庫](schema-registry-gs-serde.md)的步驟 \$13 的選用生產者屬性中調整預設快取期間 (24 小時)。

1. 序列化和傳遞記錄：如果記錄符合結構描述，序列化程式會使用結構描述版本 ID 來裝飾每個記錄，根據選取的資料格式序列化記錄 (AVRO、JSON、Protobuf 或即將推出的其他格式) 壓縮記錄 (選用生產者組態)，並將其傳遞給目的地。

1. 消費者還原序列化資料：讀取此資料的消費者使用結構描述登錄檔還原序列化程式庫，從記錄承載剖析結構描述版本 ID。

1. 還原序列化程式可能會從結構描述登錄檔要求結構描述：如果這是還原序列化程式第一次看到具有特定結構描述版本 ID 的記錄，則使用結構描述版本 ID，還原序列化程式會從結構描述登錄檔要求結構描述，並在本機快取消費者。如果結構描述登錄檔無法還原序列化記錄，消費者可以從記錄記錄中記錄資料，然後繼續，或停止應用程式。

1. 還原序列化程式使用結構描述來還原序列化記錄：當還原序列化程式從結構描述登錄檔擷取結構描述版本 ID 時，還原序列化程式會解壓縮記錄 (如果產生者傳送的記錄已壓縮)，並使用結構描述來還原序列化記錄。應用程式現在會處理記錄。

**注意**  
加密：您的用戶端透過 API 呼叫與結構描述登錄檔進行通訊，這些呼叫會使用透過 HTTPS 的 TLS 加密來加密傳輸中的資料。儲存在結構描述登錄檔中的結構描述一律使用 service-managed AWS Key Management Service (AWS KMS) 金鑰進行靜態加密。

**注意**  
使用者授權：結構描述登錄檔支援身分型 IAM 政策。

# 結構描述登錄檔入門
<a name="schema-registry-gs"></a>

以下章節將提供概觀，並逐步引導您設定和使用結構描述登錄檔。如需結構描述登錄檔概念和要素的詳細資訊，請參閱[AWS Glue 結構描述登錄檔](schema-registry.md)。

**Topics**
+ [安裝 SerDe 程式庫](schema-registry-gs-serde.md)
+ [與 AWS Glue 結構描述登錄檔整合](schema-registry-integrations.md)
+ [從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔](schema-registry-integrations-migration.md)

# 安裝 SerDe 程式庫
<a name="schema-registry-gs-serde"></a>

SerDe 程式庫提供序列化和還原序列化資料的架構。

您將為產生資料的應用程式安裝開源序列化程式 (統稱為「序列化程式」)。序列化程式會處理序列化、壓縮以及與結構描述登錄檔的互動。序列化程式會自動從寫入結構描述登錄檔相容目的地的記錄擷取結構描述，例如 Amazon MSK。同樣地，您將在使用資料的應用程式上安裝開源還原序列化程式。

# Java 實作
<a name="schema-registry-gs-serde-java"></a>

**注意**  
必要條件：在完成下列步驟前，您必須擁有執行中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。您的生產者和消費者需要在 Java 8 或更高版本上執行。

若要在生產者和消費者上安裝程式庫：

1. 在生產者和消費者的 pom.xml 檔案中，透過下面的程式碼新增此相依性：

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   或者，您也可以複製 [AWS Glue 結構描述登錄檔 Github 儲存庫](https://github.com/awslabs/aws-glue-schema-registry)。

1. 使用這些必要屬性設定您的生產者：

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   如果沒有現有的結構描述，則需要開啟自動註冊 (下一個步驟)。如果您確實有一個想套用的結構描述，那麼請用您的結構描述名稱替換「my-schema」。如果結構描述自動註冊關閉，則也必須提供「registry-name」。如果結構描述是在「default-registry」下建立的，則登錄檔名稱可以省略。

1. (選用) 設定這些選用生產者屬性中的任何一個。如需詳細的屬性說明，請參閱[讀我檔案](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)。

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   自動註冊會在預設登錄檔 (「default-registry」) 下註冊結構描述版本。如果在上一個步驟中未指定 `SCHEMA_NAME`，則主題名稱會被推斷為 `SCHEMA_NAME`。

   如需相容性模式的詳細資訊，請參閱[結構描述版本控制和相容性](schema-registry.md#schema-registry-compatibility)。

1. 使用下列必要屬性設定您的消費者：

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 區域
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (選用) 設定這些選用的消費者屬性。如需詳細的屬性說明，請參閱[讀我檔案](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)。

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 實作
<a name="schema-registry-gs-serde-csharp"></a>

**注意**  
必要條件：在完成下列步驟前，您必須擁有執行中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。您的生產者和消費者需要在 .NET 8.0 或更高版本上執行。

## 安裝
<a name="schema-registry-gs-serde-csharp-install"></a>

對於 C\$1 應用程式，請使用下列其中一種方法安裝 AWS Glue 結構描述登錄檔 SerDe NuGet 套件：

**.NET CLI：**  
使用下列命令來安裝套件：

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

其中 `<rid>` 可以是 `1.0.0-linux-x64`，`1.0.0-linux-musl-x64`或 `1.0.0-linux-arm64`

**PackageReference （在您的 .csproj 檔案中）：**  
將下列項目新增至您的專案檔案：

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

其中 `<rid>` 可以是 `1.0.0-linux-x64`，`1.0.0-linux-musl-x64`或 `1.0.0-linux-arm64`

## 組態檔案設定
<a name="schema-registry-gs-serde-csharp-config"></a>

使用必要的設定建立組態屬性檔案 （例如 `gsr-config.properties`)：

**最小組態：**  
以下顯示最低組態範例：

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## 針對 Kafka SerDes 使用 C\$1 Glue 結構描述用戶端程式庫
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**序列化程式用量範例：**  
下列範例示範如何使用 序列化程式：

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**還原序列化程式用量範例：**  
下列範例示範如何使用還原序列化程式：

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## 搭配 KafkaFlow for SerDes 使用 C\$1 Glue 結構描述用戶端程式庫
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**序列化程式用量範例：**  
下列範例示範如何使用序列化程式設定 KafkaFlow：

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**還原序列化程式用量範例：**  
下列範例示範如何使用還原序列化程式設定 KafkaFlow：

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## 選用生產者屬性
<a name="schema-registry-gs-serde-csharp-optional"></a>

您可以使用其他選用屬性來擴展組態檔案：

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## 支援的資料格式
<a name="schema-registry-gs-serde-supported-formats"></a>

Java 和 C\$1 實作都支援相同的資料格式：
+ *AVRO*：Apache Avro 二進位格式
+ *JSON*：JSON 結構描述格式
+ *PROTOBUF*：通訊協定緩衝區格式

## 備註
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ 若要開始使用程式庫，請造訪 https：//[https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ 來源碼可在 https：//[https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry) 取得

# 建立登錄檔
<a name="schema-registry-gs3"></a>

您可以使用預設登錄檔，或使用 AWS Glue API 或 AWS Glue 主控台建立任意數量的新登錄檔。

**AWS Glue API**  
您可以使用這些步驟，用 AWS Glue API 執行此任務。

若要將 AWS CLI 用於AWS Glue結構描述登錄檔 APIs，請務必將 更新 AWS CLI 為最新版本。

 若要新增登錄檔，請使用 [CreateRegistry 動作 (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) API。指定 `RegistryName` 作為要建立的登錄檔的名稱，最大長度為 255，只能包含字母、數字、連字號、底線、金額符號或井號。

將 `Description` 指定為字串，長度不可超過 2048 個位元組，需符合 [URI 位址多行字串模式](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns)。

(選用) 為登錄檔指定一個或多個 `Tags`，作為索引鍵-值對的映射陣列。

```
aws glue create-registry --registry-name registryName1 --description description
```

建立登錄檔後，系統會指派 Amazon Resource Name (ARN)，您可以在 API 回應的 `RegistryArn` 中檢視。現在您已建立登錄檔，請為該登錄檔建立一或多個結構描述。

**AWS Glue 主控台**  
新增新的登錄檔AWS Glue主控台：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 選擇 **Add registry (新增登錄檔)**。

1. 輸入**登錄名稱**，由字母、數字、連字號或底線組成。無法變更此名稱。

1. 輸入登錄檔的 **Description (描述)** (選用)。

1. 或者，將一或多個標籤套用到您的登錄檔。選擇 **Add new tag (新增標籤)**，然後指定 **Tag key (標籤鍵)** 以及選用的 **Tag value (標籤值)**。

1. 選擇 **Add registry (新增登錄檔)**。

![\[建立登錄檔的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_create_registry.png)


建立登錄檔後，系統會指派 Amazon Resource Name (ARN)，您可以在 **Schema registries (結構描述登錄檔)** 的清單中選擇登錄檔來查看。現在您已建立登錄檔，請為該登錄檔建立一或多個結構描述。

# 處理 JSON 的特定記錄 (JAVA POJO)
<a name="schema-registry-gs-json-java-pojo"></a>

您可以使用純舊 Java 物件 (POJO) 並將該物件作為記錄傳遞。這類似於 AVRO 中的特定記錄的概念。[mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema) 可以為傳遞的 POJO 產生 JSON 結構描述。此程式庫也可以在 JSON 結構描述中注入其他資訊。

AWS Glue 結構描述登錄檔程式庫會使用注入的「className」欄位，在結構描述中提供完全分類的類別名稱。「className」欄位由還原序列化程式用於還原序列化為該類別的物件。

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# 建立結構描述
<a name="schema-registry-gs4"></a>

您可以使用 AWS Glue API 或 AWS Glue 主控台建立結構描述。

**AWS Glue API**  
您可以使用這些步驟，用 AWS Glue API 執行此任務。

若要新增結構描述，請使用 [CreateSchema 動作 (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API。

指定 `RegistryId` 結構來指出結構描述的登錄檔。或者，省略 `RegistryId` 以使用預設登錄檔。

指定由字母、數字、連字號或底線組成的 `SchemaName`，以及指定 `DataFormat` 為 **AVRO** 或 **JSON**。在結構描述上設定 `DataFormat` 後就不可改變。

指定 `Compatibility` 模式：
+ *向後 (建議使用)* — 消費者可以同時讀取目前版本和先前版本。
+ *全部向後* — 消費者可以讀取目前版本和所有先前版本。
+ *向前* — 消費者可以讀取目前和後續版本。
+ *全部向前* — 消費者可以讀取目前版本和所有後續版本。
+ *完整* — 向後和向前的組合。
+ *完整全部* — 全部向後與全部向前的組合。
+ *無* — 不會執行相容性檢查。
+ *已停用* — 防止此結構描述的任何版本控制。

選用地指定結構描述的 `Tags`。

指定 `SchemaDefinition` 可定義 Avro、JSON 或 Protobuf 資料格式的結構描述。請參閱範例。

對於 Avro 資料格式：

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

對於 JSON 資料格式：

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

對於 Protobuf 資料格式：

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue 主控台**  
若要使用 AWS Glue 主控台來新增結構描述：

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Schemas (Data Catalog)** (結構描述 Data Catalog)。

1. 選擇 **Add schema (新增結構描述)**。

1. 輸入 **Schema name (結構描述名稱)**，由字母、數字、連字號、底線、金額符號或井號組成。無法變更此名稱。

1. 選擇 **Registry (登錄檔)**，其中結構描述會從下拉式選單儲存。建立後，無法變更父登錄檔。

1. 將 **Data format (資料格式)** 保留為 *Apache Avro* 或 *JSON*。此格式會套用至此結構描述的所有版本。

1. 選擇 **Compatibility mode (相容性模式)**。
   + *向後 (建議使用)* — 接收者可以讀取目前和以前的版本。
   + *全部向後* — 接收者可以讀取目前和所有以前的版本。
   + *向前* — 傳送者可以寫入目前和先前的版本。
   + *全部向前* — 傳送者可以寫入目前和所有先前的版本。
   + *完整* — 向後和向前的組合。
   + *完整全部* — 全部向後與全部向前的組合。
   + *無* — 不執行相容性檢查。
   + *已停用* — 防止此結構描述的任何版本控制。

1. 為登錄檔輸入選用 **Description (描述)**，最多 250 個字元。  
![\[建立資料結構描述的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_create_schema.png)

1. 或者，將一或多個標籤套用到您的結構描述。選擇 **Add new tag (新增標籤)**，然後指定 **Tag key (標籤鍵)** 以及選用的 **Tag value (標籤值)**。

1. 在 **First schema version (第一個結構描述版本)** 方塊中，輸入或貼上您的初始結構描述。

   如需 Avro 格式，請參閱[使用 Avro 資料格式](#schema-registry-avro)

   如需 JSON 格式的資訊，請參閱[使用 JSON 資料格式](#schema-registry-json)

1. 選用地選擇 **Add metadata (新增中繼資料)** 以新增版本中繼資料來標註或分類您的結構描述版本。

1. 選擇 **Create schema and version (建立結構描述和版本)**。

![\[建立資料結構描述的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_create_schema2.png)


結構描述隨即建立，並顯示在 **Schemas (結構描述)** 清單下。

## 使用 Avro 資料格式
<a name="schema-registry-avro"></a>

Avro 提供資料序列化和資料交換服務。Avro 儲存在 JSON 格式的資料定義，使得它易於閱讀和解釋。資料本身會以二進位格式儲存。

如需定義 Apache Avro 結構描述的相關資訊，請參閱 [Apache Avro 規格](http://avro.apache.org/docs/current/spec.html)。

## 使用 JSON 資料格式
<a name="schema-registry-json"></a>

資料可以用 JSON 格式來序列化。[JSON 結構描述格式](https://json-schema.org/)定義了 JSON 結構描述格式的標準。

# 更新結構描述或登錄檔
<a name="schema-registry-gs5"></a>

建立後，您可以編輯結構描述、結構描述版本或登錄檔。

## 更新登錄檔
<a name="schema-registry-gs5a"></a>

您可以使用 AWS Glue API 或 AWS Glue 主控台更新登錄檔。無法編輯現有登錄檔的名稱。您可以編輯登錄檔的描述。

**AWS Glue API**  
若要更新現有的登錄檔，請使用 [UpdateRegistry 動作 (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API。

指定 `RegistryId` 結構，以指出您要更新的登錄檔。傳遞 `Description` 以變更登錄檔的描述。

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue 主控台**  
使用 AWS Glue 主控台更新登錄檔：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從登錄檔清單中選擇登錄檔，方法是勾選其方塊。

1. 在 **Actions (動作)** 選單上，選擇 **Edit registry (編輯登錄檔)**。

# 更新結構描述
<a name="schema-registry-gs5b"></a>

您可以更新結構描述的描述或相容性設定。

若要更新現有的結構描述，請使用 [UpdateSchema 動作 (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API。

指定 `SchemaId` 結構描述，以指出您要更新的結構描述。必須提供其中一個 `VersionNumber` 或 `Compatibility`。

程式碼範例 11：

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# 新增結構描述版本
<a name="schema-registry-gs5c"></a>

當您新增結構描述版本時，您需要比較版本以確保新結構描述可被接受。

若要將新版本新增到現有結構描述，請使用 [RegisterSchemaVersion 動作 (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API。

指定 `SchemaId` 結構來指示您想要新增版本的結構描述，以及 `SchemaDefinition` 來定義結構描述。

程式碼範例 12：

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Schemas (Data Catalog)** (結構描述 Data Catalog)。

1. 從結構描述清單中選擇結構描述，方法是勾選其方塊。

1. 從清單中選擇一或多個結構描述，方法是勾選其方塊。

1. 在 **Action (動作)** 選單中，選擇 **Register new version (註冊新版本)**。

1. 在 **New version (新版本)** 方塊中，輸入或貼上新的結構描述。

1. 選擇 **Compare with previous version (與舊版本比較)** 以查看與以前的結構描述版本的差異。

1. 選用地選擇 **Add metadata (新增中繼資料)** 以新增版本中繼資料來標註或分類您的結構描述版本。輸入 **Key (索引鍵)** 和選用的 **Value (值)**。

1. 選擇 **Register version (註冊版本)**。

![\[新增結構描述版本。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_add_schema_version.png)


結構描述版本會顯示在版本清單中。如果版本變更了相容性模式，版本將被標記為檢查點。

## 結構描述版本比較的範例
<a name="schema-registry-gs5c1"></a>

當您選擇 **Compare with previous version (與舊版本比較)**，您會看到上一個版本和新版本一起顯示。變更的資訊將會反白顯示如下：
+ *黃色*：表示已變更的資訊。
+ *綠色*：表示在最新版本中新增的內容。
+ *紅色*：表示在最新版本中移除的內容。

您也可以與更舊版本進行比較。

![\[結構描述版本比較的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_version_comparison.png)


# 刪除結構描述或登錄檔
<a name="schema-registry-gs7"></a>

刪除結構描述、結構描述版本或登錄檔是無法復原的永久動作。

## 刪除結構描述
<a name="schema-registry-gs7a"></a>

當結構描述不再用於登錄檔、使用 或 [DeleteSchema 動作 (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API 時 AWS 管理主控台，您可能想要將其刪除。

刪除一或多個結構描述是無法復原的永久動作。請確定已不再需要結構描述。

若要從登錄檔刪除結構描述，請呼叫 [DeleteSchema 動作 (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API, 指定 `SchemaId` 結構來識別結構描述。

例如：

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue 主控台**  
從 AWS Glue 主控台刪除結構描述：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從登錄檔清單中選擇包含結構描述的登錄檔。

1. 從清單中選擇一或多個結構描述，方法是勾選其方塊。

1. 在 **Action (動作)** 選單中，選擇 **Delete schema (刪除結構描述)**。

1. 在欄位中輸入文字 **Delete** 以確認刪除。

1. 選擇 **刪除**。

您指定的結構描述會從登錄檔中刪除。

## 刪除結構描述版本
<a name="schema-registry-gs7b"></a>

當結構描述累積在登錄檔中時，您可能想要使用 或 [DeleteSchemaVersions 動作 (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API AWS 管理主控台刪除不需要的結構描述版本。刪除一或多個結構描述版本是無法復原的永久動作。請確定不再需要結構描述版本。

刪除結構描述版本時，請注意以下限制：
+ 您無法刪除檢查點的版本。
+ 連續版本的範圍不能超過 25。
+ 最新結構描述版本不可處於待定狀態。

指定 `SchemaId` 結構來識別結構描述，並指定 `Versions` 作為要刪除的版本範圍。如需指定版本或版本範圍的詳細資訊，請參閱[DeleteRegistry 動作 (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry)。您指定的結構描述版本會從登錄檔中刪除。

在此呼叫後呼叫 [ListSchemaVersions 動作 (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API 將列出已刪除版本的狀態。

例如：

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從登錄檔清單中選擇包含結構描述的登錄檔。

1. 從清單中選擇一或多個結構描述，方法是勾選其方塊。

1. 在 **Action (動作)** 選單中，選擇 **Delete schema (刪除結構描述)**。

1. 在欄位中輸入文字 **Delete** 以確認刪除。

1. 選擇 **刪除**。

您指定的結構描述版本會從登錄檔中刪除。

# 刪除登錄檔
<a name="schema-registry-gs7c"></a>

當登錄檔包含的結構描述不應再組織在該登錄檔下時，您可能會想要刪除登錄檔。您將需要將這些結構描述重新指派到另一個登錄檔。

刪除一或多個登錄檔是無法復原的永久動作。請確定登錄檔不再需要。

預設登錄檔可以使用 AWS CLI來刪除。

**AWS Glue API**  
若要刪除整個登錄檔，包括結構描述及其所有版本，請呼叫 [DeleteRegistry 動作 (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API。指定 `RegistryId` 結構描述以識別登錄檔。

例如：

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

若要取得刪除操作的狀態，您可以在非同步呼叫之後呼叫 `GetRegistry` API。

**AWS Glue 主控台**  
若要從 AWS Glue 主控台刪除登錄檔：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從清單中選擇登錄檔，方法是勾選其方塊。

1. 在 **Actions (動作)** 選單中，選擇 **Delete registry (刪除登錄檔)**。

1. 在欄位中輸入文字 **Delete** 以確認刪除。

1. 選擇 **刪除**。

您選取的登錄檔會從 AWS Glue 刪除。

## 序列化程式的 IAM 範例
<a name="schema-registry-gs1"></a>

**注意**  
AWS 受管政策會授予常見使用案例的必要許可。如需使用受管政策來管理結構描述登錄檔的相關資訊，請參閱[AWS 的 受管 （預先定義） 政策 AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed)。

對於序列化程式，您應該建立類似於下面的最小政策，以便您能夠找到特定結構描述定義的 `schemaVersionId`。請注意，您應該有登錄檔的讀取許可，才能讀取登錄檔中的結構描述。您可以使用 `Resource` 子句，限制可以讀取的登錄檔。

程式碼範例 13：

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

此外，您還可以允許生產者透過包括以下額外的方法來建立新的結構描述和版本。請注意，您應該能夠檢查登錄檔以新增/刪除/演變其中的結構描述。您可以使用 `Resource` 子句，限制可以檢查的登錄檔。

程式碼範例 14：

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## 還原序列化程式的 IAM 範例
<a name="schema-registry-gs1b"></a>

對於還原序列化程式 (消費者方面)，您應該建立類似下面的政策，以允許還原序列化程式從結構描述登錄檔中擷取結構描述以進行還原序列化。請注意，您應該能夠檢查登錄檔以擷取其中的結構描述。

程式碼範例 15：

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## 使用 的私有連線 AWS PrivateLink
<a name="schema-registry-gs-private"></a>

您可以使用 AWS Glue 透過定義 的介面 VPC 端點 AWS PrivateLink ，將資料生產者的 VPC 連線至 AWS Glue。使用 VPC 介面端點時，VPC 和 AWS Glue 之間的通訊完全在 AWS 網路中執行。如需詳細資訊，請參閱[使用 AWS Glue 搭配 VPC 端點](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html)。

# 存取 Amazon CloudWatch 指標
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch 指標可作為 CloudWatch 的免費方案的一部分使用。您可以在 CloudWatch 主控台中存取這些指標。API 層級指標包括 CreateSchema (成功和延遲)、GetSchemaByDefinition (成功和延遲)、GetSchemaVersion (成功和延遲)、RegisterSchemaVersion (成功和延遲)、PutSchemaVersionMetadata (成功和延遲)。資源層級指標包括 Registry.ThrottledByLimit、SchemaVersion.ThrottledByLimit、SchemaVersion.Size。

# 結構描述登錄的範例 CloudFormation 範本
<a name="schema-registry-integrations-cfn"></a>

以下是在 中建立結構描述登錄檔資源的範例範本 CloudFormation。若要在您的帳戶中建立此堆疊，請將上述範本複製到檔案 `SampleTemplate.yaml`，然後執行下列命令：

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

此範例使用 `AWS::Glue::Registry` 建立登錄檔、使用 `AWS::Glue::Schema` 建立結構描述、使用 `AWS::Glue::SchemaVersion` 建立結構描述版本，以及使用 `AWS::Glue::SchemaVersionMetadata` 填入結構描述版本中繼資料。

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# 與 AWS Glue 結構描述登錄檔整合
<a name="schema-registry-integrations"></a>

這些章節說明與 AWS Glue 結構描述登錄檔的整合。這些章節中的範例顯示了 AVRO 資料格式的結構描述。如需更多範例 (包括具有 JSON 資料格式的結構描述)，請參閱[AWS Glue 結構描述登錄檔開源儲存庫](https://github.com/awslabs/aws-glue-schema-registry)中的整合測試和讀我資訊。

**Topics**
+ [使用案例：將結構描述登錄檔連線到 Amazon MSK 或 Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [使用案例：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合](#schema-registry-integrations-kds)
+ [使用案例：Amazon Managed Service for Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [使用案例：與 整合 AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [使用案例： AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [使用案例：AWS Glue 串流](#schema-registry-integrations-aws-glue-streaming)
+ [使用案例：Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## 使用案例：將結構描述登錄檔連線到 Amazon MSK 或 Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

假設您正在將資料寫入 Apache Kafka 主題，並且您可以按照下列步驟開始使用。

1. 建立具有至少一個主題的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。如果是建立 Amazon MSK 叢集，您可以使用 AWS 管理主控台。請遵循以下說明：*Amazon Managed Streaming for Apache Kafka 開發人員指南*中的 [Amazon MSK 入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)。

1. 遵循上述的[安裝 SerDe 程式庫](schema-registry-gs-serde.md)步驟。

1. 若要建立結構描述登錄檔、結構描述或結構描述版本，請依照這份文件的[結構描述登錄檔入門](schema-registry-gs.md)一節指示進行。

1. 啟動您的生產者和消費者以使用結構描述登錄檔來從/向 Amazon MSK 或 Apache Kafka 主題寫入和讀取記錄。範例生產者和消費者程式碼可以在 Serde 程式庫的[讀我檔案](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)找到。生產者上的結構描述登錄檔程式庫會自動序列化記錄，並使用結構描述版本 ID 裝飾記錄。

1. 如果已輸入此記錄的結構描述，或者如果開啟自動註冊，則結構描述將已在結構描述登錄檔中註冊。

1. 消費者使用 AWS Glue 結構描述登錄檔程式庫從 Amazon MSK 或 Apache Kafka 主題讀取，會自動從結構描述登錄檔查詢結構描述。

## 使用案例：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合
<a name="schema-registry-integrations-kds"></a>

此整合要求您擁有現有的 Amazon Kinesis 資料串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的 [Amazon Kinesis Data Streams 入門](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)。

您可以透過兩種方式與 Kinesis 資料串流中的資料互動。
+ 透過 Java 中的 Kinesis Producer Library (KPL) 和 Kinesis Client Library (KCL) 程式庫。不提供多語言支援。
+ 透過 適用於 Java 的 AWS SDK中提供的 `PutRecords`、`PutRecord` 以及 `GetRecords` Kinesis Data Streams API。

如果您目前使用 KPL/KCL 程式庫，我們建議您繼續使用該方法。有更新的 KCL 和 KPL 版本與結構描述登錄檔整合，如範例所示。否則，您可以使用範例程式碼來利用 AWS Glue 結構描述登錄檔 (如果直接使用 KDS API)。

結構描述登錄檔整合只適用於 KPL v0.14.2 或更高版本和 KCL v2.3 或更高版本。結構描述登錄檔與 JSON 資料格式整合適用於 KPL v0.14.8 或更高版本和 KCL v2.3.6 或更高版本。

### 使用 Kinesis SDK V2 與資料互動
<a name="schema-registry-integrations-kds-sdk-v2"></a>

本節說明如何使用 Kinesis SDK V2 與 Kinesis 互動

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### 使用 KPL/KCL 程式庫與資料互動
<a name="schema-registry-integrations-kds-libraries"></a>

本節說明使用 KPL/KCL 程式庫將 Kinesis Data Streams 與結構描述登錄檔整合。如需使用 KPL/KCL 的詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Amazon Kinesis Producer Library 開發生產者](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)。

#### 在 KPL 中設定結構描述登錄檔
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. 定義 AWS Glue 結構描述登錄檔中撰寫的資料的結構描述定義、資料格式和結構描述名稱。

1. 選用地設定 `GlueSchemaRegistryConfiguration` 物件。

1. 將結構描述物件傳遞給 `addUserRecord API`。

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### 設定 Kinesis client library
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

您將用 Java 開發 Kinesis Client Library 消費者。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[以 Java 開發 Kinesis Client Library 消費者](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html)。

1. 傳遞 `GlueSchemaRegistryConfiguration` 物件，建立 `GlueSchemaRegistryDeserializer` 的執行個體。

1. 傳遞 `GlueSchemaRegistryDeserializer` 至 `retrievalConfig.glueSchemaRegistryDeserializer`。

1. 透過呼叫 `kinesisClientRecord.getSchema()` 存取內送訊息的結構描述。

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### 使用 Kinesis Data Streams API 與資料互動
<a name="schema-registry-integrations-kds-apis"></a>

本節說明使用 Kinesis Data Streams API 將 Kinesis Data Streams 與結構描述登錄檔整合。

1. 更新這些 Maven 相依性：

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. 在生產者中，使用 Kinesis Data Streams 中的 `PutRecords` 或 `PutRecord` API 新增結構描述標頭資訊。

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. 在生產者中，使用 `PutRecords` 或 `PutRecord` API，將記錄放入資料串流中。

1. 在消費者中，從標頭移除結構描述記錄，並序列化 Avro 結構描述記錄。

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### 使用 Kinesis Data Streams API 與資料互動
<a name="schema-registry-integrations-kds-apis-reference"></a>

以下是使用 `PutRecords` 和 `GetRecords` API 的範例程式碼。

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## 使用案例：Amazon Managed Service for Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink 是一個流行的開源架構和分散式處理引擎，用於對未限制和有限制資料串流進行狀態計算。Amazon Managed Service for Apache Flink 是一種全受管 AWS 服務，可讓您建置和管理 Apache Flink 應用程式來處理串流資料。

開源 Apache Flink 提供一些來源和接收器。例如，預先定義的資料來源包括從檔案、目錄和通訊端讀取，以及從集合和迭代器擷取資料。Apache Flink DataStream Connector 為 Apache Flink 提供與各種第三方系統連接的程式碼，例如 Apache Kafka 或 Kinesis 做為來源和/或接收器。

如需詳細資訊，請參閱 [Amazon Kinesis Data Analytics 開發人員指南](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)。

### Apache Flink Kafka 連接器
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink 提供 Apache Kafka 資料串流連接器，用於使用恰好一次的保證從 Kafka 主題讀取資料以及寫入資料至 Kafka 主題。Flink 的 Kafka 消費者 `FlinkKafkaConsumer` 提供從一個或多個 Kafka 主題的讀取存取權。Apache Flink 的 Kafka 生產者 `FlinkKafkaProducer` 允許寫入記錄串流到一個或多個 Kafka 主題。如需詳細資訊，請參閱 [Apache Kafka 連接器](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html)。

### Apache Flink Kinesis 串流連接器
<a name="schema-registry-integrations-kinesis-connector"></a>

Kinesis Data Streams 連接器可存取 Amazon Kinesis Data Streams。`FlinkKinesisConsumer` 是完全一次性的平行串流資料來源，可訂閱相同 AWS 服務區域內的多個 Kinesis 串流，並在任務執行時透明地處理串流的重新分片。消費者的每個子工作都負責從多個 Kinesis 分片擷取資料記錄。每個子工作擷取的碎片數量將隨著碎片關閉並由 Kinesis 建立而改變。`FlinkKinesisProducer` 使用 Kinesis Producer Library (KPL) 將 Apache Flink 串流中的資料放入 Kinesis 串流中。如需詳細資訊，請參閱 [Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html)。

如需詳細資訊，請參閱 [AWS Glue 結構描述 GitHub 儲存庫](https://github.com/awslabs/aws-glue-schema-registry)。

### 與 Apache Flink 整合
<a name="schema-registry-integrations-apache-flink-integrate"></a>

結構描述登錄檔提供的 SerDes 程式庫與 Apache Flink 整合。若要使用 Apache Flink，您需要實作稱為 `GlueSchemaRegistryAvroSerializationSchema` 和 `GlueSchemaRegistryAvroDeserializationSchema` 的 [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) 和 [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) 介面，然後將介面插入 Apache Flink 連接器。

### 新增 AWS Glue 結構描述登錄檔相依性到 Apache Flink 應用程式
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

在 Apache Flink 應用程式中將整合相依性設定為 AWS Glue 結構描述登錄檔：

1. 將相依性新增到 `pom.xml` 檔案。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### 將 Kafka 或 Amazon MSK 與 Apache Flink 整合
<a name="schema-registry-integrations-kda-integrate-msk"></a>

您可以使用 Managed Service for Apache Flink，搭配 Kafka 作為來源或搭配 Kafka 作為接收器。

**Kafka 作為來源**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kafka 作為來源。

![\[Kafka 作為來源。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka 做為接收器**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kafka 作為接收器。

![\[Kafka 做為接收器。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kafka-sink.png)


若要整合 Kafka (或 Amazon MSK) 與 Managed Service for Apache Flink，搭配 Kafka 作為來源或搭配 Kafka 作為接收器，請對程式碼進行下列變更。將粗體程式碼區塊新增到類似區段中的各自程式碼中。

如果 Kafka 是來源，請使用還原序列化程式程式碼 (區塊 2)。如果 Kafka 是接收器，請使用序列化程式程式碼 (區塊 3)。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### 將 Kinesis Data Streams 與 Apache Flink 整合
<a name="schema-registry-integrations-integrate-kds"></a>

您可以使用 Managed Service for Apache Flink，搭配 Kinesis Data Streams 作為來源或接收器。

**Kinesis Data Streams 做為來源**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kinesis Data Streams 作為來源。

![\[Kinesis Data Streams 做為來源。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams 做為接收器**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kinesis Data Streams 作為接收器。

![\[Kinesis Data Streams 做為接收器。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kinesis-sink.png)


若要將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kinesis Data Streams 作為來源或將 Kinesis Data Streams 作為接收器，請對程式碼進行下列變更。將粗體程式碼區塊新增到類似區段中的各自程式碼中。

如果 Kinesis Data Streams 是來源，請使用還原序列化程式程式碼 (區塊 2)。如果 Kinesis Data Streams 是接收器，請使用序列化程式程式碼 (區塊 3)。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## 使用案例：與 整合 AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

若要使用 AWS Lambda函數做為 Apache Kafka/Amazon MSK 取用者，並使用AWS Glue結構描述登錄檔還原序列化 Avro 編碼的訊息，請造訪 [MSK 實驗室頁面](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)。

## 使用案例： AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue 資料表支援您可以手動指定的結構描述，也可以參考 AWS Glue 結構描述登錄檔。結構描述登錄檔與 Data Catalog 整合，可讓您在建立或更新 Data Catalog 中的 AWS Glue 資料表或分割區時選用地使用儲存在結構描述登錄檔中的結構描述。若要識別結構描述登錄檔中的結構描述定義，您至少需要知道它所屬結構描述的 ARN。包含結構描述定義的結構描述版本，可以透過其 UUID 或版本號碼來參考。總有一個結構描述版本，即「最新」版本，可以在不知道其版本號碼或 UUID 的情況下查詢。

呼叫 `CreateTable` 或 `UpdateTable` 操作時，您將傳遞一個 `TableInput` 結構，其中包含 `StorageDescriptor`，它可能有一個新增到結構描述登錄檔中現有結構描述的 `SchemaReference`。同樣地，當您呼叫 `GetTable` 或 `GetPartition` API，回應可能包含結構描述和 `SchemaReference`。使用結構描述參考建立資料表或分割區時，Data Catalog 會嘗試擷取此結構描述參考的結構描述。如果它無法在結構描述登錄檔中找到結構描述，它會在 `GetTable` 回應中傳回空的結構描述；否則回應將同時具有結構描述和結構描述參考。

您可以在 AWS Glue 主控台中執行下列動作。

若要執行這些操作並建立、更新或檢視結構描述資訊，您必須向提供 `GetSchemaVersion` API 許可的呼叫使用者授予 IAM 角色。

### 新增資料表或更新資料表的結構描述
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

從現有的結構描述新增資料表，會將資料表繫結至特定的結構描述版本。註冊新的結構描述版本後，您可以從 AWS Glue 主控台 View table (檢視資料表) 頁面或使用 [UpdateTable 動作 (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 更新此資料表定義。

#### 從現有的結構描述新增資料表
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

您可以使用 AWS Glue 主控台或 `CreateTable` API，從登錄檔中的結構描述版本建立 AWS Glue 資料表。

**AWS Glue API**  
呼叫 `CreateTable` API 時，您將傳遞包含 `StorageDescriptor` 的 `TableInput`，它對於結構描述登錄檔中現有的結構描述會有一個 `SchemaReference`。

**AWS Glue 主控台**  
使用 AWS Glue 主控台建立資料表

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Tables** (資料表)。

1. 在 **Add Tables (新增資料表)** 選單中，選擇 **Add table from existing schema (從現有結構描述新增資料表)**。

1. 根據《AWS Glue 開發人員指南》設定資料表屬性和資料存放區。

1. 在 **Choose a Glue schema (選擇 Glue 結構描述)** 頁面上，選取結構描述所在的 **Registry (登錄檔)**。

1. 選擇 **Schema name (結構描述名稱)**，然後選取要套用之結構描述的 **Version (版本)**。

1. 檢閱結構描述預覽，然後選擇 **Next (下一步)**。

1. 檢閱和建立資料表。

套用至資料表的結構描述和版本會顯示在資料表清單的 **Glue schema (Glue 結構描述)** 欄。您可以檢視資料表以查看更多詳細資訊。

#### 更新資料表的結構描述
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

有新的結構描述版本可用時，您可能需要使用 [UpdateTable 動作 (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 或 AWS Glue 主控台來更新資料表的結構描述。

**重要**  
更新具有手動指定之 AWS Glue 結構描述的現有資料表的結構描述時，結構描述登錄檔中參考的新結構描述可能不相容。這可能會導致您的任務失敗。

**AWS Glue API**  
呼叫 `UpdateTable` API 時，您將傳遞包含 `StorageDescriptor` 的 `TableInput`，它對於結構描述登錄檔中現有的結構描述會有一個 `SchemaReference`。

**AWS Glue 主控台**  
從 AWS Glue 主控台更新資料表的結構描述：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Tables** (資料表)。

1. 從資料表清單中檢視資料表。

1. 按一下 **Update schema (更新結構描述)**，通知您有關新版本的方塊。

1. 檢閱目前資料結構描述和新資料結構描述之間的差異。

1. 選擇 **Show all schema differences (顯示所有結構描述差異)** 以查看更多詳細資訊。

1. 選擇 **Save table (儲存資料表)** 以接受新的版本。

## 使用案例：AWS Glue 串流
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue 串流會消耗來自串流來源的資料，並在寫入輸出接收器之前執行 ETL 操作。輸入串流來源可以使用資料表來指定，也可以透過指定來源配置的方式直接指定。

在該結構描述存在於 AWS Glue 結構描述登錄檔的情況下，AWS Glue 串流支援針對串流來源所建立的 Data Catalog 資料表。您可以在 AWS Glue 結構描述登錄檔中建立結構描述，並使用此結構描述來建立具有串流來源的 AWS Glue 表格。此 AWS Glue 表格可以用來作為 AWS Glue 串流任務的輸入，以供還原序列化輸入串流中的資料之用。

請注意，當 AWS Glue 結構描述登錄檔中的結構描述變更時，您必須重新啟動 AWS Glue 串流任務需求以反映結構描述中的變更。

## 使用案例：Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Apache Kafka Streams API 是用於處理和分析 Apache Kafka 中儲存資料的用戶端程式庫。本節介紹 Apache Kafka Streams 與 AWS Glue 結構描述登錄檔的整合，可讓您在資料串流應用程式上管理和強制執行結構描述。如需 Apache Kafka Streams 的詳細資訊，請參閱 [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/)。

### 與 SerDes 程式庫整合
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

有一個 `GlueSchemaRegistryKafkaStreamsSerde` 類別，您可用於設定串流應用程式。

#### Kafka Streams 應用程式範例程式碼
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

在 Apache Kafka Streams 應用程式中使用 AWS Glue 結構描述登錄檔：

1. 設定 Kafka Streams 應用程式。

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. 從主題 avro-input 建立串流。

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. 處理資料記錄 (範例會篩選出 favorite\$1color 值為 pink 或 amount 值為 15 的記錄)。

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. 將結果寫回主題 avro-output。

   ```
   result.to("avro-output");
   ```

1. 啟動 Apache Kafka Streams 應用程式。

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### 實作結果
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

這些結果顯示記錄篩選程序，其在步驟 3 中篩選出 favorite\$1color 為「pink」或值為「15.0」的記錄。

篩選前的記錄：

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

篩選後的記錄：

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### 使用案例：Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

將 Apache Kafka Connect 與 AWS Glue 結構描述登錄檔整合，可讓您從連接器取得結構描述資訊。Apache Kafka 轉換器指定 Apache Kafka 中的資料的格式，以及如何將其轉換成 Apache Kafka Connect 資料。每個 Apache Kafka Connect 使用者將需要根據他們希望他們的資料在載入或儲存到 Apache Kafka 時的格式，設定這些轉換器。透過這種方式，您可以定義自己的轉換器，將 Apache Kafka Connect 資料轉換為 AWS Glue 結構描述登錄檔中使用的類型 (例如：Avro) 並利用我們的序列化程式註冊其結構描述並進行序列化。然後轉換器也能夠使用我們的還原序列化程式將從 Apache Kafka 接收到的資料還原序列化，並將其轉換回 Apache Kafka Connect 資料。範例工作流程圖如下所示。

![\[Apache Kafka Connect 工作流程。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. 安裝 `aws-glue-schema-registry` 專案，方法是複製 [AWS Glue 結構描述登錄檔的 Github 儲存庫](https://github.com/awslabs/aws-glue-schema-registry)。

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. 如果您打算在*獨立*模式中使用 Apache Kafka Connect，請使用此步驟的下列指示來更新 **connect-standalone.properties**。如果您打算在*分散式*模式中使用 Apache Kafka Connect，請使用相同的說明更新 **connect-avro-distributed.properties**。

   1. 將這些屬性也新增到 Apache Kafka 連接屬性檔案：

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. 將下面的命令新增到 **kafka-run-class.sh** 的 **Launch mode (啟動模式)** 區段下：

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. 將下面的命令新增到 **kafka-run-class.sh** 的 **Launch mode (啟動模式)** 區段下

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   它應該如下所示：

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. 如果使用 bash，執行下面的命令來在您的 bash\$1profile 中設定您的 CLASSPATH。對於任何其他 Shell，請相應地更新環境。

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (選用) 如果您想要使用簡單的檔案來源進行測試，請複製檔案來源連接器。

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. 在來源連接器組態下，將資料格式編輯為 Avro、檔案讀取器編輯為 `AvroFileReader` 並從您正在讀取的檔案路徑中更新範例 Avro 物件。例如：

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. 安裝來源連接器。

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. 更新 `<your Apache Kafka installation directory>/config/connect-file-sink.properties` 下的接收器屬性會更新主題名稱和輸出檔案名稱。

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. 啟動來源連接器 (在此範例中，它是檔案來源連接器)。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. 執行接收器連接器 (在此範例中，它是檔案接收器連接器)。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   如需範例 Kafka Connect 用法，請參閱 [AWS Glue 結構描述登錄檔的 Github 儲存庫](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests)中 integration-tests 資料夾下的 run-local-tests.sh 指令碼。

# 從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔
<a name="schema-registry-integrations-migration"></a>

從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔，對現有、目前的第三方結構描述登錄檔具有相依性。如果在使用第三方結構描述登錄檔傳送的 Apache Kafka 主題中有記錄，則消費者需要第三方結構描述登錄檔來還原序列化這些記錄。`AWSKafkaAvroDeserializer` 提供指定次要還原序列化程式類別的能力，此類別指向第三方還原序列化程式並用於還原序列化這些記錄。

淘汰第三方結構描述有兩個條件。首先，只有在使用第三方結構描述登錄檔的 Apache Kafka 主題中的記錄不再被任何消費者需要後，才會發生淘汰。其次，取決於針對這些主題指定的保留期間，Apache Kafka 主題的老化可能會導致淘汰。請注意，如果您有無限保留的主題，您仍然可以移轉至 AWS Glue 結構描述登錄檔，但您將無法淘汰第三方結構描述登錄檔。因應措施是，您可以使用應用程式或 Mirror Maker 2 讀取目前主題，並使用 AWS Glue 結構描述登錄檔產生新主題。

從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔：

1. 建立 AWS Glue 結構描述登錄檔中的登錄檔，或使用預設登錄檔。

1. 停止消費者。修改它以包含 AWS Glue 結構描述登錄檔做為主要的還原序列化程式，並包含第三方結構描述登錄檔做為次要。
   + 設定消費者屬性。在此範例中，secondary\$1deserializer 設定為不同的還原序列化程式。行為如下：消費者從 Amazon MSK 擷取記錄，並首先嘗試使用 `AWSKafkaAvroDeserializer`。如果無法為 AWS Glue 結構描述登錄檔讀取包含 Avro 結構描述 ID 的魔術位元組結構描述，則 `AWSKafkaAvroDeserializer` 會嘗試使用 secondary\$1deserializer 中提供的還原序列化程式類別。次要還原序列化程式專屬的屬性也需要在消費者屬性中提供，例如 schema\$1registry\$1url\$1config 和 specific\$1avro\$1reader\$1config，如下所示。

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. 重新啟動消費者。

1. 停止生產者並將生產者指向 AWS Glue 結構描述登錄檔。

   1. 設定生產者屬性。在這個範例中，生產者將使用 default-registry 和自動註冊結構描述版本。

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (選用) 手動將現有的結構描述和結構描述版本從目前的第三方結構描述登錄檔移至 AWS Glue 結構描述登錄檔，無論是 AWS Glue 結構描述登錄檔中的預設登錄檔或 AWS Glue 結構描述登錄檔中的特定非預設登錄檔。這可以透過從第三方結構描述登錄檔匯出 JSON 格式的結構描述，並使用 AWS 管理主控台 或 在AWS Glue結構描述登錄檔中建立新的結構描述來完成 AWS CLI。

    如果您需要使用 AWS CLI 和 為新建立的結構描述版本啟用與先前結構描述版本的相容性檢查 AWS 管理主控台，或者生產者以開啟結構描述版本自動註冊的新結構描述傳送訊息時，此步驟可能很重要。

1. 啟動生產者。