

# AWS Glue 架构注册表
<a name="schema-registry"></a>

**注意**  
以下区域的 AWS Glue 控制台不支持 AWS Glue 架构注册表：中东（阿联酋）。

AWS Glue 架构注册表让您可以集中发现、控制和演变数据流架构。*架构*定义了数据记录的结构和格式。借助 AWS Glue 架构注册表，您可以使用与 Apache Kafka、[Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/)、[Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/)、[适用于 Apache Flink 的亚马逊托管服务](https://aws.amazon.com/kinesis/data-analytics/)和 [AWS Lambda](https://aws.amazon.com/lambda/) 的方便集成，在数据流应用程序上管理和实施架构。

架构注册表支持 AVRO（v1.11.4）数据格式、采用适用于架构（规范 Draft-04、Draft-06 和 Draft-07）的 [JSON 架构格式](https://json-schema.org/)并使用 [Everit 库](https://github.com/everit-org/json-schema)进行 JSON 架构验证的 JSON 数据格式、协议缓冲区（Protobuf）版本 proto2 和 proto3（不支持 `extensions` 或 `groups`）以及 Java 语言支持，对其他数据格式和语言的支持也将推出。支持的功能包括兼容性、通过元数据获取架构、架构自动注册、IAM 兼容性，以及可选的的 ZLIB 压缩（用来减少存储和数据传输）。架构注册表属于无服务器服务，可以免费使用。

将架构用作创建者和使用者之间的数据格式合同，这样可以改进数据治理，提高数据质量，并使数据使用者能够适应兼容的上游更改。

架构注册表允许分散的系统共享序列化和反序列化的架构。例如，假设您拥有数据创建者和使用者。创建者在发布数据时知道架构。架构注册表为某些系统（如 Amazon MSK 或 Apache Kafka）提供序列化程序和反序列化程序。

 有关更多信息，请参阅 [架构注册表的工作原理](schema-registry-works.md)。

**Topics**
+ [架构](#schema-registry-schemas)
+ [注册表](#schema-registry-registries)
+ [架构版本控制和兼容性](#schema-registry-compatibility)
+ [开源 Serde 库](#schema-registry-serde-libraries)
+ [架构注册表的配额](#schema-registry-quotas)
+ [架构注册表的工作原理](schema-registry-works.md)
+ [架构注册表入门](schema-registry-gs.md)

## 架构
<a name="schema-registry-schemas"></a>

*架构*定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。

在 Avro 的此示例架构中，格式和结构由布局和字段名称定义，字段名称的格式由数据类型（例如，`string`、`int`）定义。

```
{
    "type": "record",
    "namespace": "ABC_Organization",
    "name": "Employee",
    "fields": [
        {
            "name": "Name",
            "type": "string"
        },
        {
            "name": "Age",
            "type": "int"
        },
        {
            "name": "address",
            "type": {
                "type": "record",
                "name": "addressRecord",
                "fields": [
                    {
                        "name": "street",
                        "type": "string"
                    },
                    {
                        "name": "zipcode",
                        "type": "int" 
                    }
                ]
            }
        }
    ]
}
```

在此示例 JSON 架构 Draft-07 中，格式由 [JSON 架构组织](https://json-schema.org/)定义。

```
{
	"$id": "https://example.com/person.schema.json",
	"$schema": "http://json-schema.org/draft-07/schema#",
	"title": "Person",
	"type": "object",
	"properties": {
		"firstName": {
			"type": "string",
			"description": "The person's first name."
		},
		"lastName": {
			"type": "string",
			"description": "The person's last name."
		},
		"age": {
			"description": "Age in years which must be equal to or greater than zero.",
			"type": "integer",
			"minimum": 0
		}
	}
}
```

在此示例中，Protobuf 的格式由[协议缓冲区语言版本 2（proto2）](https://developers.google.com/protocol-buffers/docs/reference/proto2-spec)定义。

```
syntax = "proto2";

package tutorial;

option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";

message Person {
  optional string name = 1;
  optional int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    optional string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}
```

## 注册表
<a name="schema-registry-registries"></a>

*注册表*是架构的逻辑容器。注册表允许您组织架构以及管理应用程序的访问控制。注册表具有 Amazon Resource Name（ARN），允许您组织和设置注册表中架构操作的不同访问权限。

您可以根据需要使用默认注册表或创建任意数量的新注册表。


**AWS Glue 架构注册表层次结构**  

|  | 
| --- |
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/schema-registry.html)  | 

## 架构版本控制和兼容性
<a name="schema-registry-compatibility"></a>

每个架构都可以有多个版本。版本控制由应用于架构的兼容性规则控制。在新架构版本的注册请求成功之前，架构注册表将根据此规则检查请求。

标记为检查点的架构版本用于确定注册架构新版本的兼容性。首次创建架构时，默认检查点将是第一个版本。随着架构发展为更多版本，您可以使用 CLI/SDK 将检查点更改为架构版本，使用遵守一组约束条件的 `UpdateSchema` API。在控制台中，编辑架构定义或兼容性模式会默认将检查点更改为最新版本。

兼容性模式允许您控制架构随时间发展或不能发展的方式。这些模式构成了生成和使用数据的应用程序之间的契约。将架构的新版本提交到注册表时，应用于架构名称的兼容性规则将用于确定是否可以接受新版本。有 8 种兼容性模式：NONE、DISABLED、BACKWARD、BACKWARD\$1ALL、FORWARD、FORWARD\$1ALL、FULL、FULL\$1ALL。

在 Avro 数据格式中，字段可能是可选字段或必填字段。可选字段是指 `Type` 包含 null 的字段。必填字段不会将 null 视为 `Type`。

在 Protobuf 数据格式中，字段可以是可选字段（包括重复字段），也可以在 proto2 语法中为必填字段，而在 proto3 语法中，所有字段均为可选字段（包括重复字段）。所有兼容性规则均基于对协议缓冲区规范以及来自 [Google 协议缓冲区文档](https://developers.google.com/protocol-buffers/docs/overview#updating)指南的了解。
+ *NONE*：不适用兼容模式。您可以在开发场景或者在不知道要应用于架构的兼容性模式时使用此选项。无需进行兼容性检查，接受添加的任何新版本。
+ *DISABLED*：此兼容性选项可防止对特定架构进行版本控制。无法添加新版本。
+ *BACKWARD*：建议使用此兼容性选项，因为它允许使用者读取架构的最新版本和上一个版本。当您需要删除字段或添加可选字段时，您可以使用此选项检查所有先前架构版本的兼容性。BACKWD 的典型使用案例是针对最近的架构创建应用程序时。

**AVRO**  
例如，假定您有一个由名字（必填）、姓氏（必填）、电子邮件地址（必填）和电话号码（可选）定义的架构。

  如果您的下个架构版本删除了必填的电子邮件地址字段，这意味着注册成功。BACKWARD 兼容性要求使用者能够读取当前和以前的架构版本。您的使用者将能够读取新架构，因为旧邮件中的额外电子邮件地址字段将被忽略。

  如果您有建议的新架构版本添加了必填字段（例如邮政编码），这将无法成功注册 BACKWARD 兼容性。新版本的使用者将无法在架构更改之前读取旧邮件，因为他们缺少必需的邮政编码字段。但是，如果在新架构中将邮政编码字段设置为可选，则建议的版本将成功注册，因为使用者可以在没有可选邮政编码字段的情况下读取旧架构。

**JSON**  
例如，假定您具有由名字（可选）、姓氏（可选）、电子邮件地址（可选）和电话号码（可选）定义的架构版本。

  如果您的下一个架构版本添加了可选的电话号码属性，则只要原始架构版本将 `additionalProperties` 字段设置为 False 以禁止任何附加属性，就能成功注册。BACKWARD 兼容性要求使用者能够读取当前和以前的架构版本。您的使用者将能够读取不存在电话号码属性的原始架构生成的数据。

  如果您有建议的新架构版本添加了可选的电话号码属性，则当原始架构版本将 `additionalProperties` 字段设置为 True，即允许任何附加属性时，这样将无法成功注册 BACKWARD 兼容性。新版本的使用者无法在架构更改之前读取旧邮件，因为他们无法读取具有不同类型的电话号码属性的数据，例如字符串而不是数字。

**PROTOBUF**  
例如，假设您有一个由 Message（邮件）`Person` 定义的架构版本，其中 proto2 语法下包含 `first name`（必填字段）、`last name`（必填字段）、`email`（必填字段）和 `phone number`（可选字段）。

  与 AVRO 使用场景相似，如果您的下个架构版本删除了必填的 `email` 字段，这意味着注册成功。BACKWARD 兼容性要求使用者能够读取当前和以前的架构版本。您的使用者将能够读取新架构，因为旧邮件中的额外 `email` 字段将被忽略。

  如果您有建议的新架构版本添加了必填字段（例如 `zip code`），这将无法成功注册 BACKWARD 兼容性。新版本的使用者将无法在架构更改之前读取旧邮件，因为他们缺少必需的 `zip code` 字段。但是，如果在新架构中将 `zip code` 字段设置为可选，则建议的版本将成功注册，因为使用者可以在没有可选 `zip code` 字段的情况下读取旧架构。

  对于 gRPC 使用案例，添加新的 RPC 服务或 RPC 方法是一种向后兼容更改。例如，假设您拥有由 RPC 服务 `MyService`（包含两种 RPC 方法 `Foo` 和 `Bar`）定义的架构版本。

  如果您的下一个架构版本添加了名为 `Baz` 的新 RPC 方法，则这会成功注册。您的使用者将能够读取由原始架构根据 BACKWARD 兼容性生成的数据，因为新添加的 RPC 方法 `Baz` 是可选的。

  如果您有建议的新架构版本删除了现有的 PC 方法 `Foo`，这将无法成功注册 BACKWARD 兼容性。新版本的使用者无法读取架构更改之前的旧邮件，因为他们无法通过 gRPC 应用程序中不存在的 RPC 方法 `Foo` 了解和读取数据。
+ *BACKWARD\$1ALL*：此兼容性选项允许使用者读取架构的最新版本和所有先前版本。当您需要删除字段或添加可选字段时，您可以使用此选项检查所有先前架构版本的兼容性。
+ *FORWARD*：此兼容性选项允许使用者读取当前和下一个架构版本，但不一定是更高版本。当您添加字段或删除可选字段时，您可以使用此选项检查上一个架构版本的兼容性。FORDE 的一个典型使用案例是，您的应用程序是为之前的架构创建的，并且应该能够处理最新架构。

**AVRO**  
例如，假定您有一个由名字（必填）、姓氏（必填）、电子邮件地址（可选）定义的架构。

  如果您有新的架构版本添加了必填字段（例如电话号码），这将成功注册。FORWARD 兼容性要求使用者能够使用先前版本读取新架构生成的数据。

  如果您有建议的架构版本删除了必填名字字段，这将无法成功注册 FORWARD 兼容性。先前版本的使用者将无法读取建议的架构，因为他们缺少必填的名称字段。但是，如果名字字段最初是可选的，则建议的新架构将成功注册，因为使用者可以根据没有可选名字字段的新架构读取数据。

**JSON**  
例如，假定您具有由名字（可选）、姓氏（可选）、电子邮件地址（可选）和电话号码（可选）定义的架构版本。

  如果您的新架构版本删除了可选的电话号码属性，则只要新架构版本将 `additionalProperties` 字段设置为 False，不允许任何附加属性，则这样就会成功注册。FORWARD 兼容性要求使用者能够使用先前版本读取新架构生成的数据。

  如果您有建议的架构版本删除了可选的电话号码属性，则当新架构版本将 `additionalProperties` 字段设置为 True，即允许任何附加属性时，这样将无法成功注册 FORWARD 兼容性。先前版本的使用者无法读取建议的架构，因为他们可能有不同类型的电话号码属性，例如字符串而不是数字。

**PROTOBUF**  
例如，假设您有一个由 Message（邮件）`Person` 定义的架构版本，其中 proto2 语法下包含 `first name`（必填字段）、`last name`（必填字段）和 `email`（可选字段）。

  与 AVRO 使用场景相似，如果您有新的架构版本添加了必填字段（例如 `phone number`），这将成功注册。FORWARD 兼容性要求使用者能够使用先前版本读取新架构生成的数据。

  如果您有建议的架构版本删除了必填 `first name` 字段，这将无法成功注册 FORWARD 兼容性。先前版本的使用者将无法读取建议的架构，因为他们缺少必填的 `first name` 字段。但是，如果 `first name` 字段最初是可选的，则建议的新架构将成功注册，因为使用者可以根据没有可选 `first name` 字段的新架构读取数据。

  对于 gRPC 使用案例，删除 RPC 服务或 RPC 方法是一种向前兼容更改。例如，假设您拥有由 RPC 服务 `MyService`（包含两种 RPC 方法 `Foo` 和 `Bar`）定义的架构版本。

  如果您的下一个架构版本删除了名为 `Foo` 的现有 RPC 方法，这将根据 FORWARD 兼容性成功注册，因为使用者可以使用先前版本读取新架构生成的数据。如果您有建议的架构版本添加了 RPC 方法 `Baz`，这将无法成功注册 FORWARD 兼容性。先前版本的使用者将无法读取建议的架构，因为他们缺少 RPC 方法 `Baz`。
+ *FORWARD\$1ALL*：此兼容性选项允许使用者读取任何新注册架构的创建者写入的数据。当您需要添加字段或删除可选字段以及检查所有先前架构版本的兼容性时，您可以使用此选项。
+ *FULL*：此兼容性选项允许使用者读取创建者使用先前版本或下一版本的架构写入的数据，但不是早期版本或更高版本的创建器写入的数据。当您添加或删除可选字段时，您可以使用此选项检查上一个架构版本的兼容性。
+ *FULL\$1ALL*：此兼容性选项允许创建者读取使用所有架构先前版本的创建器写入的数据。当您添加或删除可选字段时，您可以使用此选项检查所有先前架构版本的兼容性。

## 开源 Serde 库
<a name="schema-registry-serde-libraries"></a>

AWS 提供开源 Serde 库，作为序列化和反序列化数据的框架。这些库的开源设计允许通用的开源应用程序和框架，以在其项目中支持这些库。

有关 Serde 库工作原理的更多详细信息，请参阅[架构注册表的工作原理](schema-registry-works.md)。

## 架构注册表的配额
<a name="schema-registry-quotas"></a>

配额，也称为 AWS 中的限制，是 AWS 账户中资源、操作和项目的最大值。以下是 AWS Glue 中架构注册表的软限制。

**架构版本的元数据键值对数**  
每个 AWS 区域的每个 SchemaVersion 最多可以有 10 个键值对。

您可以使用 [QuerySchemaVersionMetadata 操作（Python：query\$1schema\$1version\$1metadata）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-QuerySchemaVersionMetadata) 或者 [PutSchemaVersionMetadata 操作（Python：put\$1schema\$1version\$1metadata）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-PutSchemaVersionMetadata) API 查看或设置键值元数据对。

以下是 AWS Glue 中架构注册表的硬限制。

**注册表**  
此账户在每个 AWS 区域最多可以有 100 个注册表。

**SchemaVersion**  
此账户在每个 AWS 区域最多可以有 1 万个架构版本。

每个新架构都会创建一个新的架构版本，因此假设每个架构只有一个版本，则理论上每个区域每个账户最多可有 1 万个架构。

**架构负载**  
架构负载的大小限制为 170KB。

# 架构注册表的工作原理
<a name="schema-registry-works"></a>

本节介绍架构注册表中序列化和反序列化过程的工作原理。

1. 注册架构：如果注册表中尚不存在架构，则可以使用与目标名称类似的架构名称（例如，test\$1topic、test\$1stream、prod\$1firehose）注册架构，或者创建者可以为架构提供自定义名称。创建者还可以在创建架构时将键值对（例如 source: msk\$1kafka\$1topic\$1A）作为元数据添加到架构，或者将 AWS 标签添加到架构。注册架构后，架构注册表会将架构版本 ID 返回到序列化程序。如果架构存在，但序列化程序使用的是不存在的新版本，则架构注册表将检查架构引用兼容性规则，以确保兼容新版本，然后再将其注册为新版本。

   有两种架构注册方法：手动注册和自动注册。您可以通过 AWS Glue 控制台或 CLI/SDK 手动注册架构。

   在序列化程序设置中打开自动注册时，将执行架构自动注册。如果创建者配置中未提供 `REGISTRY_NAME`，则自动注册将在默认注册表（default-registry）下注册新架构版本。请参阅[安装 SerDe 库](schema-registry-gs-serde.md)，了解有关指定自动注册属性的信息。

1. 序列化程序根据架构验证数据记录：当生成数据的应用程序注册了其架构时，架构注册表序列化程序将验证应用程序生成的记录是否使用与已注册架构匹配的字段和数据类型进行结构化。如果记录架构与注册架构不匹配，则序列化程序将返回异常，并且应用程序将无法将记录传递到目标。

   如果不存在架构，并且架构名称不是通过创建者配置提供，则将使用与主题名称（如果是 Apache Kafka 或 Amazon MSK）或流名称（如果是 Kinesis Data Streams）类似的名称创建架构。

   每条记录都有架构定义和数据。根据架构注册表中的现有架构和版本查询架构定义。

   默认情况下，创建者缓存架构定义和已注册架构的架构版本 ID。如果记录的架构版本定义与缓存中的可用内容不匹配，则创建者将尝试使用架构注册表验证架构。如果架构版本有效，则其版本 ID 和定义将在创建器上本地缓存。

   您可以在[安装 SerDe 库](schema-registry-gs-serde.md)的步骤 3 中的可选创建器属性内调整默认缓存周期（24 小时）。

1. 序列化和传递记录：如果记录符合架构，则序列化程序将使用架构版本 ID 装饰每条记录，根据选定的数据格式（AVRO、JSON、Protobuf，或即将推出的其他格式）序列化记录，压缩记录（可选的创建器配置），并将其传送到目标。

1. 使用者反序列化数据：读取此数据的使用者使用架构注册表反序列化程序库，该库从记录负载解析架构版本 ID。

1. 反序列化程序可能会从模式注册表请求架构：如果这是反序列化程序第一次看到具有特定架构版本 ID 的记录，则使用架构版本 ID，反序列化程序将从架构注册表请求架构并在使用器上本地缓存架构。如果架构注册表无法反序列化记录，则使用者可以录入记录中的数据，然后继续或停止应用程序。

1. 反序列化程序使用架构反序列化记录：当反序列化程序从架构注册表中检索架构版本 ID 时，反序列化程序将解压缩记录（如果创建器发送的记录已压缩），并使用架构反序列化记录。现在，应用程序处理记录。

**注意**  
加密：您的客户端通过 API 调用与架构注册表通信，这些调用使用 HTTPS 上的 TLS 加密对传输中的数据进行加密。存储于架构注册表中的架构始终使用服务托管 AWS Key Management Service（AWS KMS）密钥。

**注意**  
用户授权：架构注册表支持基于身份的 IAM policy。

# 架构注册表入门
<a name="schema-registry-gs"></a>

以下部分概述和演示了如何设置和使用架构注册表。有关架构注册表概念和组件的信息，请参阅[AWS Glue 架构注册表](schema-registry.md)。

**Topics**
+ [安装 SerDe 库](schema-registry-gs-serde.md)
+ [与 AWS Glue 架构注册表集成](schema-registry-integrations.md)
+ [从第三方架构注册表迁移到 AWS Glue 架构注册表](schema-registry-integrations-migration.md)

# 安装 SerDe 库
<a name="schema-registry-gs-serde"></a>

Serde 库提供用于序列化和反序列化数据的框架。

您将为生成数据的应用程序（统称为“序列化程序”）安装开源序列化程序。序列化程序处理序列化、压缩以及与架构注册表的交互。序列化程序自动从正在写入架构注册表兼容目标（如 Amazon MSK）的记录中提取架构。同样，您将在使用数据的应用程序上安装开源反序列化程序。

# Java 实现
<a name="schema-registry-gs-serde-java"></a>

**注意**  
先决条件：完成以下步骤之前，您需要先运行 Amazon Managed Streaming for Apache Kafka（Amazon MSK）或 Apache Kafka 集群。您的创建器和使用器需要在 Java 8 或更高版本上运行。

在创建器和使用器上安装库：

1. 在创建器和使用器的 pom.xml 文件中，通过下面的代码添加此依赖关系：

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   或者，您可以克隆 [AWS Glue 架构注册表](https://github.com/awslabs/aws-glue-schema-registry)。

1. 使用这些必填属性设置您的创建器：

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   如果没有现有架构，则需要打开自动注册（下一步）。如果您确实有要应用的架构，则将“my-schema”替换为您的架构名称。此外，如果架构自动注册处于关闭状态，则必须提供“registry-name”。如果架构在“default-registry”下创建，则可以省略注册表名称。

1. （可选）设置这些可选的创建器属性。有关详细属性说明，请参阅[自述文件](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)。

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   自动注册会在默认注册表（“default-registry”）下注册架构版本。如果上一步未指定 `SCHEMA_NAME`，则主题名称被推断为 `SCHEMA_NAME`。

   有关兼容性模式的更多信息，请参阅[架构版本控制和兼容性](schema-registry.md#schema-registry-compatibility)。

1. 使用这些必填属性设置您的使用器：

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 区域
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. （可选）设置这些可选的使用器属性。有关详细属性说明，请参阅[自述文件](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)。

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 实现
<a name="schema-registry-gs-serde-csharp"></a>

**注意**  
先决条件：完成以下步骤之前，您需要先运行 Amazon Managed Streaming for Apache Kafka（Amazon MSK）或 Apache Kafka 集群。您的生产者和使用者需要在 .NET 8.0 或更高版本上运行。

## 安装
<a name="schema-registry-gs-serde-csharp-install"></a>

对于 C\$1 应用程序，请使用以下方法之一安装 AWS Glue 架构注册表 SerDe NuGet 软件包：

**.NET CLI：**  
使用以下命令安装软件包：

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

其中，`<rid>` 可能为 `1.0.0-linux-x64`、`1.0.0-linux-musl-x64` 或 `1.0.0-linux-arm64`

**PackageReference（在您的 .csproj 文件中）：**  
请将以下内容添加至您的项目文件：

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

其中，`<rid>` 可能为 `1.0.0-linux-x64`、`1.0.0-linux-musl-x64` 或 `1.0.0-linux-arm64`

## 配置文件设置
<a name="schema-registry-gs-serde-csharp-config"></a>

使用所需设置创建配置属性文件（例如 `gsr-config.properties`）：

**最低配置：**  
下面显示了一个最低配置示例：

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## 在 Kafka SerDes 上使用 C\$1 Glue 架构客户端库
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**序列化器用法示例：**  
以下示例显示了如何使用序列化器：

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**反序列化器用法示例：**  
以下示例显示了如何使用反序列化器：

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## 在 SerDes 的 KafkaFlow 中使用 C\$1 Glue 架构客户端库
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**序列化器用法示例：**  
以下示例显示了如何使用序列化器配置 KafkaFlow：

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**反序列化器用法示例：**  
以下示例显示了如何使用反序列化器配置 KafkaFlow：

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## 可选生产者属性
<a name="schema-registry-gs-serde-csharp-optional"></a>

您可以使用其他可选属性扩展配置文件：

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## 支持的数据格式
<a name="schema-registry-gs-serde-supported-formats"></a>

Java 和 C\$1 实现都支持相同的数据格式：
+ *AVRO*：Apache Avro 二进制格式
+ *JSON*：JSON 架构格式
+ *PROTOBUF*：协议缓冲区格式

## 备注
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ 要开始使用库，请访问 [https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ 源代码可在以下网址获得：[https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# 创建注册表
<a name="schema-registry-gs3"></a>

您可以使用默认注册表，也可以使用 AWS Glue API 或 AWS Glue 控制台创建任意数量的新注册表。

**AWS Glue API**  
您可以按照以下步骤使用 AWS Glue API 执行此任务。

要将 AWS CLI 用于 AWS Glue 架构注册表 API，请确保将 AWS CLI 更新到最新版本。

 要添加新注册表，请使用 [CreateRegistry 操作（Python：create\$1registry）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) API。将 `RegistryName` 指定为要创建的注册表的名称，其最大长度为 255 个字符，且只能包含字母、数字、连字符、下划线、美元符号或哈希标记。

将 `Description` 指定为字符串，其长度不超过 2048 个字节，与 [URI 地址多行字符串模式](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns)匹配。

（可选）为注册表指定一个或多个 `Tags`，作为键值对的映射数组。

```
aws glue create-registry --registry-name registryName1 --description description
```

您的注册表创建后，会被分配一个 Amazon Resource Name（ARN），您可以在API 响应的 `RegistryArn` 中查看。现在，您已创建注册表，请为该注册表创建一个或多个架构。

**AWS Glue 控制台**  
在 AWS Glue 控制台中添加新注册表：

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格中的 **Data catalog (数据目录)** 下面，选择 **Schema registries (架构注册表)**。

1. 选择 **Add registry (添加注册表)**。

1. 为注册表输入 **Registry name (注册表名称)**，包含字母、数字、连字符或下划线。此名称不能更改。

1. 为注册表输入 **Description (说明)**（可选）。

1. 或者，将一个或多个标签应用到您的注册表。选择 **Add new tag (添加新标签)**，指定 **Tag key (标签键)** 和 **Tag value (标签值)**（可选）。

1. 选择 **Add registry (添加注册表)**。

![\[注册表创建示例。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/schema_reg_create_registry.png)


创建注册表后，系统会为其分配一个 Amazon Resource Name（ARN），您可以从 **Schema registries (架构注册表)** 列表中选择注册表进行查看。现在，您已创建注册表，请为该注册表创建一个或多个架构。

# 处理 JSON 的特定记录（JAVO POJO）
<a name="schema-registry-gs-json-java-pojo"></a>

您可以使用普通旧 Java 对象（POJO）并将该对象作为记录传递。这类似于 AVRO 中特定记录的概念。[mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema) 可以为传递的 POJO 生成 JSON 架构。此库还可以在 JSON 架构中注入其他信息。

AWS Glue 架构注册表库使用架构中注入的“className”字段提供完全分类的类名称。反序列化程序使用“className”字段，反序列化为该类的对象。

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# 创建架构
<a name="schema-registry-gs4"></a>

您可以使用 AWS Glue API 或 AWS Glue 控制台创建架构。

**AWS Glue API**  
您可以按照以下步骤使用 AWS Glue API 执行此任务。

要添加新架构，请使用 [CreateSchema 操作（Python：create\$1schema）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API。

指定 `RegistryId` 结构，指示架构的注册表。或者，忽略 `RegistryId`，使用默认注册表。

指定 `SchemaName`（包含字母、数字、连字符或下划线）和 `DataFormat` 为 **AVRO** 或者 **JSON**。`DataFormat` 一旦在架构上设置，便不可更改。

指定 `Compatibility` 模式：
+ *向后兼容（推荐）*– 使用者可以读取当前版本和先前版本。
+ *向后兼容全部* – 使用者可以读取当前版本和所有先前版本。
+ *向前兼容* – 使用者可以读取当前版本和后续版本。
+ *向前兼容全部* – 使用者可以读取当前版本和所有后续版本。
+ *完整兼容* – 向后兼容和向前兼容的组合。
+ *完整兼容全部* – 向后兼容全部和向前兼容全部的组合。
+ *无* – 不执行兼容性检查。
+ *已禁用* – 防止对此架构进行任何版本控制。

（可选）为您的架构指定 `Tags`。

指定 `SchemaDefinition`，以 Avro、JSON 或 Protobuf 数据格式定义架构。请参阅示例。

对于 Avro 数据格式：

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

对于 JSON 数据格式：

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

对于 Protobuf 数据格式：

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue 控制台**  
使用 AWS Glue 控制台添加新架构

1. 登录 AWS 管理控制台，然后通过以下网址打开 AWS Glue 控制台：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格的 **Data catalog (数据目录)** 下，选择 **Schemas (架构)**。

1. 选择 **Add schema (添加架构)**。

1. 输入 **Schema name (架构名称)**，包含字母、数字、连字符、下划线、美元符号或哈希标记。此名称不能更改。

1. 选择 **Registry (注册表)**，这是架构在下拉菜单中的存储位置。父注册表在创建后无法更改。

1. 将 **Data format (数据格式)** 保留为 *Apache Avro* 或者 *JSON*。此格式适用于此架构的所有版本。

1. 选择 **Compatibility mode (兼容性模式)**。
   + *向后兼容（推荐）*– 接收者可以读取当前版本和先前版本。
   + *向后兼容全部* – 接收者可以读取当前版本和所有先前版本。
   + *向前兼容* – 发件人可以写入当前版本和先前版本。
   + *向前兼容全部* – 发件人可以写入当前版本和所有先前版本。
   + *完整兼容* – 向后兼容和向前兼容的组合。
   + *完整兼容全部* – 向后兼容全部和向前兼容全部的组合。
   + *无* – 不执行兼容性检查。
   + *已禁用* – 防止对此架构进行任何版本控制。

1. 为注册表输入可选 **Description (说明)**，最多 250 个字符。  
![\[架构创建示例。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/schema_reg_create_schema.png)

1. （可选）将一个或多个标签应用于架构。选择 **Add new tag (添加新标签)**，指定 **Tag key (标签键)** 和 **Tag value (标签值)**（可选）。

1. 在 **First schema version (第一个架构版本)** 框中，输入或粘贴您的初始架构。

   有关 Avro 格式的信息，请参阅[使用 Avro 数据格式](#schema-registry-avro)

   有关 JSON 格式的信息，请参阅[使用 JSON 数据格式](#schema-registry-json)

1. （可选）选择 **Add metadata (添加元数据)** 添加版本元数据，对架构版本进行注释或分类。

1. 选择 **Create schema and version (创建架构和版本)**。

![\[架构创建示例。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/schema_reg_create_schema2.png)


该架构在 **Schemas (架构)** 下面的列表中创建并显示。

## 使用 Avro 数据格式
<a name="schema-registry-avro"></a>

Avro 提供数据序列化和数据交换服务。Avro 以 JSON 格式存储数据定义，使其易于阅读和解释。数据本身以二进制格式存储。

有关定义 Apache Avro 架构的信息，请参阅 [Apache Avro 规范](http://avro.apache.org/docs/current/spec.html)。

## 使用 JSON 数据格式
<a name="schema-registry-json"></a>

数据可以使用 JSON 格式进行序列化。[JSON 架构格式](https://json-schema.org/)定义了 JSON 架构格式的标准。

# 更新架构或注册表
<a name="schema-registry-gs5"></a>

创建后，您可以编辑架构、架构版本或注册表。

## 更新注册表
<a name="schema-registry-gs5a"></a>

您可以使用 AWS Glue API 或 AWS Glue 控制台。无法编辑现有注册表的名称。您可以编辑注册表的说明。

**AWS Glue API**  
要更新现有注册表，请使用 [UpdateRegistry 操作（Python：update\$1registry）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API。

指定 `RegistryId` 结构，指示要更新的注册表。传递 `Description`，更改注册表的说明。

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue 控制台**  
使用 AWS Glue 控制台更新注册表

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格中的 **Data catalog (数据目录)** 下面，选择 **Schema registries (架构注册表)**。

1. 从注册表列表中选择一个注册表，方法是选中该注册表的复选框。

1. 在 **Action (操作)** 菜单中，选择 **Edit registry (编辑注册表)**。

# 更新架构
<a name="schema-registry-gs5b"></a>

您可以更新架构的说明或兼容性设置。

要更新现有架构，请使用 [UpdateSchema 操作（Python：update\$1schema）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API。

指定 `SchemaId` 结构，指示要更新的架构。必须提供 `VersionNumber` 或 `Compatibility` 中的一个。

代码示例 11：

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# 添加架构版本
<a name="schema-registry-gs5c"></a>

添加架构版本时，您需要比较版本，确保接受新架构。

要为现有架构添加新版本，请使用 [RegisterSchemaVersion 操作（Python：register\$1schema\$1version）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API。

指定 `SchemaId` 结构，指示要为其添加版本的架构，以及指定 `SchemaDefinition` 以定义架构。

代码示例 12：

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格的 **Data catalog (数据目录)** 下，选择 **Schemas (架构)**。

1. 从方案列表中选择一个方案，方法是选择该方案对应的框。

1. 从列表中选择一个或多个方案，方法是选择相关方案对应的框。

1. 在 **Action (操作)** 菜单中，选择 **Register new version (注册新版本)**。

1. 在 **New version (新版本)** 框中，输入或粘贴新架构。

1. 选择 **Compare with previous version (与先前版本比较)**，查看与先前架构版本的差异。

1. （可选）选择 **Add metadata (添加元数据)** 添加版本元数据，对架构版本进行注释或分类。输入 **Key (密钥)** 和可选 **Value (值)**。

1. 选择 **Register version (注册版本)**。

![\[添加架构版本。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/schema_reg_add_schema_version.png)


架构版本在版本列表中显示。如果版本更改了兼容性模式，则该版本将标记为检查点。

## 架构版本比较示例
<a name="schema-registry-gs5c1"></a>

当您选择 **Compare with previous version (与先前版本比较)** 时，您将看到先前版本和新版本一起显示。更改的信息将如下突出显示：
+ *黄色*：表示已更改的信息。
+ *绿色*：表示在最新版本中添加的内容。
+ *红色*：表示在最新版本中删除的内容。

您还可以与早期版本进行比较。

![\[架构版本比较示例。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/schema_reg_version_comparison.png)


# 删除架构或注册表
<a name="schema-registry-gs7"></a>

删除架构、架构版本或注册表是永久性操作，无法撤消。

## 删除架构
<a name="schema-registry-gs7a"></a>

如果架构不再在注册表中使用，您可能希望删除该架构，请使用 AWS 管理控制台 或 [DeleteSchema 操作（Python：delete\$1schema）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API。

删除一个或多个架构是永久性操作，无法撤消。确保不再需要该架构。

要从注册表中删除架构，请调用 [DeleteSchema 操作（Python：delete\$1schema）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API，指定 `SchemaId` 结构以标识架构。

例如：

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue 控制台**  
从 AWS Glue 控制台删除架构：

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格中的 **Data catalog (数据目录)** 下面，选择 **Schema registries (架构注册表)**。

1. 从注册表列表中选择包含您的架构的注册表。

1. 从列表中选择一个或多个方案，方法是选择相关方案对应的框。

1. 在 **Action (操作)** 菜单中，选择 **Delete schema (删除架构)**。

1. 在字段中输入文本 **Delete** 以确认删除。

1. 选择**删除**。

您指定的架构将从注册表中删除。

## 删除架构版本
<a name="schema-registry-gs7b"></a>

随着架构在注册表中累积，您可能需要使用 AWS 管理控制台 或 [DeleteSchemaVersions 操作（Python：delete\$1table\$1version）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API 删除不需要的架构。删除一个或多个架构版本是永久性操作，无法撤消。确保不再需要该架构版本。

删除架构版本时，请注意以下约束：
+ 您无法删除检查点版本。
+ 连续版本的范围不能超过 25。
+ 最新架构版本不得处于待处理状态。

指定 `SchemaId` 结构以标识架构，并指定 `Versions` 为要删除的版本范围。有关指定版本或版本范围的更多信息，请参阅[DeleteRegistry 操作（Python：delete\$1registry）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry)。您指定的架构版本将从注册表中删除。

此调用后调用的 [ListSchemaVersions 操作（Python：list\$1schema \$1version）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API 将列出已删除版本的状态。

例如：

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格中的 **Data catalog (数据目录)** 下面，选择 **Schema registries (架构注册表)**。

1. 从注册表列表中选择包含您的架构的注册表。

1. 从列表中选择一个或多个方案，方法是选择相关方案对应的框。

1. 在 **Action (操作)** 菜单中，选择 **Delete schema (删除架构)**。

1. 在字段中输入文本 **Delete** 以确认删除。

1. 选择**删除**。

您指定的架构版本将从注册表中删除。

# 删除注册表
<a name="schema-registry-gs7c"></a>

当注册表包含的架构不再在该注册表下进行组织时，您可能需要删除该注册表。您需要将这些架构重新分配给另一个注册表。

删除一个或多个注册表是永久性操作，无法撤消。确保不再需要该注册表。

默认注册表可以使用 AWS CLI 删除。

**AWS Glue API**  
要删除整个注册表（包括架构及其所有版本），请调用 [DeleteRegistry 操作（Python：delete\$1registry）](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API。指定 `RegistryId` 结构以标识注册表。

例如：

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

要获取删除操作的状态，您可以在异步调用后调用 `GetRegistry` API。

**AWS Glue 控制台**  
从 AWS Glue 控制台删除注册表：

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格中的 **Data catalog (数据目录)** 下面，选择 **Schema registries (架构注册表)**。

1. 从列表中选择一个注册表，方法是选中相应的框。

1. 在 **Action (操作)** 菜单中，选择 **Delete registry (删除注册表)**。

1. 在字段中输入文本 **Delete** 以确认删除。

1. 选择**删除**。

您选择的注册表将从 AWS Glue 中删除。

## 用于序列化程序的 IAM 示例
<a name="schema-registry-gs1"></a>

**注意**  
AWS 托管策略将授予针对常用使用案例的必要权限。有关使用托管策略管理模式注册表的信息，请参阅[适用于 AWS Glue 的 AWS 托管（预定义）策略](security-iam-awsmanpol.md#access-policy-examples-aws-managed)。

对于序列化程序，您应该创建类似如下的最低策略，以便您能够找到针对指定架构定义的 `schemaVersionId`。请注意，您应该对注册表具有读取权限，以便读取注册表中的架构。您可以使用 `Resource` 子句，限制可以读取的注册表。

代码示例 13：

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

此外，您还可以允许创建者包括以下额外的方法，以创建新的架构和版本。请注意，您应该能够检查注册表，以便添加/删除/演变其中的架构。您可以使用 `Resource` 子句，限制可以检查的注册表。

代码示例 14：

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## 用于反序列化程序的 IAM 示例
<a name="schema-registry-gs1b"></a>

对于反序列化程序（使用者端），您应创建类似如下的策略，以允许反序列化程序从架构注册表中获取架构以进行反序列化。请注意，您应该能够检查注册表，以便获取其中的架构。

代码示例 15：

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## 使用 AWS PrivateLink 的私有连接
<a name="schema-registry-gs-private"></a>

您可以使用 AWS PrivateLink 将您的数据创建者的 VPC 连接到 AWS Glue，方法是为 AWS Glue 定义接口 VPC 终端节点。当您使用 VPC 接口端点时，您的 VPC 与 AWS Glue 之间的通信完全在 AWS 网络内进行。有关更多信息，请参阅[将 AWS Glue 与接口 VPC 终端节点一起使用](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html)。

# 访问 Amazon CloudWatch 指标
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch 指标可作为 CloudWatch 免费套餐的一部分提供。您可以在 CloudWatch 控制台中访问这些指标。API 级指标包括 CreateSchema（成功和延迟）GetSchemaByDefinition（成功和延迟）、GetSchemaVersion（成功和延迟）、RegisterSchemaVersion（成功和延迟）、PutSchemaVersionMetadata（成功和延迟）。资源级指标包括 Registry.ThrottledByLimit、SchemaVersion.ThrottledByLimit、SchemaVersion.Size。

# 架构注册表的示例 CloudFormation 模板
<a name="schema-registry-integrations-cfn"></a>

以下是一个示例模板，用于在 CloudFormation 中创建架构注册表。要在您的账户中创建此堆栈，请将上面的模板复制到文件 `SampleTemplate.yaml`，并运行以下命令：

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

此示例使用 `AWS::Glue::Registry` 创建注册表，使用 `AWS::Glue::Schema` 创建架构，使用 `AWS::Glue::SchemaVersion` 创建架构版本，使用 `AWS::Glue::SchemaVersionMetadata` 填充架构版本元数据。

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# 与 AWS Glue 架构注册表集成
<a name="schema-registry-integrations"></a>

这些章节介绍了与 AWS Glue 架构注册表集成的相关知识。本部分中的示例显示了具有 AVRO 数据格式的架构。有关更多示例（包括具有 JSON 数据格式的架构），请参阅 [AWS Glue 架构注册表开源存储库](https://github.com/awslabs/aws-glue-schema-registry)。

**Topics**
+ [使用案例：将架构注册表连接到 Amazon MSK 或 Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [使用案例：将 Amazon Kinesis Data Streams 与 AWS Glue 架构注册表集成](#schema-registry-integrations-kds)
+ [应用场景：适用于 Apache Flink 的亚马逊托管服务](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [使用案例：与 AWS Lambda 集成](#schema-registry-integrations-aws-lambda)
+ [使用案例：AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [使用案例：AWS Glue 流式传输](#schema-registry-integrations-aws-glue-streaming)
+ [使用案例：Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## 使用案例：将架构注册表连接到 Amazon MSK 或 Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

假设您正在向 Apache Kafka 主题写入数据，您可以按照以下步骤操作。

1. 创建 Amazon Managed Streaming for Apache Kafka（Amazon MSK）或 Apache Kafka 集群，至少具有一个主题。如果创建 Amazon MSK 集群，您可以使用 AWS 管理控制台。遵循以下说明：请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》**中的[开始使用 Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)。

1. 遵循上面的[安装 SerDe 库](schema-registry-gs-serde.md)步骤。

1. 要创建架构注册表、架构或架构版本，请按照本文档[架构注册表入门](schema-registry-gs.md)部分下面的说明操作。

1. 启动您的创建器和使用器，使用架构注册表将记录写入 Amazon MSK 或 Apache Kafka，或者从其中读取记录。您可从 Serde 库的[自述文件](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)中找到示例创建器和使用器代码。创建器上的架构注册表库将自动序列化记录，并使用架构版本 ID 装饰记录。

1. 如果已输入此记录的架构，或者启用了自动注册，则该架构将已在架构注册表中注册。

1. 如果使用器从 Amazon MSK 或 Apache Kafka 主题读取数据，使用 AWS Glue 架构注册表库，则该使用器将自动从架构注册表中查找架构。

## 使用案例：将 Amazon Kinesis Data Streams 与 AWS Glue 架构注册表集成
<a name="schema-registry-integrations-kds"></a>

此集成要求您拥有现有 Amazon Kinesis 数据流。有关更多信息，请参阅《Amazon Kinesis Data Streams 开发人员指南》**中的 [Amazon Kinesis Data Streams 入门](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)。

您可以通过两种方式与 Kinesis 数据流中的数据进行交互。
+ 通过 Java 中的 Kinesis Producer Library（KPL）和 Kinesis Client Library（KCL）库。不提供多语言支持。
+ 通过 适用于 Java 的 AWS SDK 中的 `PutRecords`、`PutRecord` 和 `GetRecords` Kinesis Data Streams API。

如果您当前使用的是 KPL/KCL 库，我们建议您继续使用该方法。有更新的 KCL 和 KPL 版本集成了架构注册表，如示例所示。否则，您可以使用示例代码来利用 AWS Glue 架构注册表（如果直接使用 KDS API）。

架构注册表集成仅适用于 KPL v0.14.2 或更高版本以及 KCL v2.3 或更高版本。架构注册表与 JSON 数据格式的集成仅适用于 KPL v0.14.8 或更高版本以及 KCL v2.3.6 或更高版本。

### 使用 Kinesis SDK V2 与数据进行交互
<a name="schema-registry-integrations-kds-sdk-v2"></a>

本部分介绍如何使用 Kinesis SDK V2 与 Kinesis 进行交互

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### 使用 KPL/KCL 库与数据进行交互
<a name="schema-registry-integrations-kds-libraries"></a>

本部分介绍如何使用 KPL/KCL 库将 Kinesis Data Streams 与架构注册表集成。有关 KPL/KCL 使用的更多信息，请参阅*《Amazon Kinesis Data Streams 开发人员指南》*中的 [使用 Amazon Kinesis Producer Library 开发创建器](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)。

#### 在 KPL 中设置架构注册表
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. 定义 AWS Glue 架构注册表中编写的数据、数据格式和架构名称的架构定义。

1. （可选）配置 `GlueSchemaRegistryConfiguration` 对象。

1. 将架构对象传递到 `addUserRecord API`。

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### 设置 Kinesis 客户端库
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

您将在 Java 中开发 Kinesis Client Library 使用器 有关更多信息，请参阅*《Amazon Kinesis Data Streams 开发人员指南》*中的 [在 Java 中开发 Kinesis Client Library 使用器](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html)。

1. 传递 `GlueSchemaRegistryConfiguration` 对象以创建 `GlueSchemaRegistryDeserializer` 的实例。

1. 将 `GlueSchemaRegistryDeserializer` 传递到 `retrievalConfig.glueSchemaRegistryDeserializer`。

1. 调用 `kinesisClientRecord.getSchema()`，访问传入消息的架构。

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### 使用 Kinesis Data Streams API 与数据交互
<a name="schema-registry-integrations-kds-apis"></a>

本部分介绍如何使用 Kinesis Data Streams API 将 Kinesis Data Streams 与架构注册表集成。

1. 更新这些 Maven 依赖关系：

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. 在创建器中，使用 Kinesis Data Streams 中的 `PutRecords` 或者 `PutRecord` API 添加架构标头信息。

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. 在创建器中，使用 `PutRecords` 或者 `PutRecord` API 将记录放入数据流。

1. 在使用器中，从标头中删除架构记录，然后序列化 Avro 架构记录。

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### 使用 Kinesis Data Streams API 与数据交互
<a name="schema-registry-integrations-kds-apis-reference"></a>

以下是使用 `PutRecords` 和 `GetRecords` API 的示例代码。

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## 应用场景：适用于 Apache Flink 的亚马逊托管服务
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink 是一个热门的开源框架和分布式处理引擎，用于对无界和有界数据流进行有状态计算。适用于 Apache Flink 的亚马逊托管服务是一项完全托管式的 AWS 服务，可让您构建和管理 Apache Flink 应用程序以处理流式传输数据。

开源 Apache Flink 提供了大量的源和接收器。例如，预定义的数据源包括从文件、目录和套接字读取数据，以及从集合和迭代器中提取数据。Apache Flink DataStream 连接器为 Apache Flink 提供与各种第三方系统（如作为源和/或接收器的 Apache Kafka 或 Kinesis）接口的代码。

有关更多信息，请参阅 [Amazon Kinesis Data Analytics 开发人员指南](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)。

### Apache Flink Kafka 连接器
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink 提供 Apache Kafka 数据流连接器，用于从 Kafka 主题读取数据并将数据写入其中，确切具有一次保证。Flink 的 Kafka 使用器 `FlinkKafkaConsumer` 提供从一个或多个 Kafka 主题读取数据的访问权限。Apache Flink 的 Kafka 创建器 `FlinkKafkaProducer` 允许将记录流写入一个或多个 Kafka 主题。有关更多信息，请参阅 [Apache Kafka 连接器](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html)。

### Apache Flink Kinesis Streams 连接器
<a name="schema-registry-integrations-kinesis-connector"></a>

Kinesis 数据流连接器可让您访问 Amazon Kinesis Data Streams。`FlinkKinesisConsumer` 是确切一次的并行流数据源，它订阅同一 AWS 服务区域的多个 Kinesis 流，并且可以在任务运行时透明地处理流的重新分区。使用器的每个子任务负责从多个 Kinesis 分片中获取数据记录。随着分区关闭并由 Kinesis 创建，每个子任务获取的分片数量将发生变化。`FlinkKinesisProducer` 使用 Kinesis Producer Library（KPL）将来自 Apache Flink 流的数据放入 Kinesis 流。有关更多信息，请参阅 [Amazon Kinesis Streams 连接器](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html)。

有关更多信息，请参阅 [AWS Glue GitHub 存储库](https://github.com/awslabs/aws-glue-schema-registry)。

### 与 Apache Flink 集成
<a name="schema-registry-integrations-apache-flink-integrate"></a>

架构注册表提供的 SerDes 库与 Apache Flink 集成。要使用 Apache Flink，您需要实施 [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) 和 [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) 接口（分别称为 `GlueSchemaRegistryAvroSerializationSchema` 和 `GlueSchemaRegistryAvroDeserializationSchema`），您可以将其插入 Apache Fink 连接器。

### 将 AWS Glue 架构注册表依赖关系添加到 Apache Flink 应用程序
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

将集成依赖项设置到 Apache Flink 应用程序中的 AWS Glue 架构注册表：

1. 将依赖项添加到 `pom.xml` 文件。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### 将 Kafka 或 Amazon MSK 与 Apache Fink 集成
<a name="schema-registry-integrations-kda-integrate-msk"></a>

您可以使用 Managed Service for Apache Flink for Apache Flink，以 Kafka 作为源或接收器。

**以 Kafka 为源：**  
下图显示了将 Kinesis Data Streams 与 Managed Service for Apache Flink for Apache Flink 集成，以 Kafka 作为源。

![\[以 Kafka 为源。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/gsr-kafka-source.png)


**以 Kafka 为接收器**  
下图显示了将 Kinesis Data Streams 与 Managed Service for Apache Flink for Apache Flink 集成，以 Kafka 作为接收器。

![\[以 Kafka 为接收器。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/gsr-kafka-sink.png)


要将 Kafka（或者 Amazon MSK）与 Managed Service for Apache Flink for Apache Flink 集成，以 Kafka 作为源或接收器，请在下面进行代码更改。将粗体代码数据块添加到类似部分中相应的代码。

如果 Kafka 是源，则使用反序列化程序代码（数据块 2）。如果 Kafka 是接收器，则使用序列化程序代码（数据块 3）。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### 将 Kinesis Data Streams 与 Apache Flink 集成
<a name="schema-registry-integrations-integrate-kds"></a>

您可以使用 Managed Service for Apache Flink for Apache Flink，以 Kinesis Data Streams 作为源或接收器。

**以 Kinesis Data Streams 为源**  
下图显示了将 Kinesis Data Streams 与 Managed Service for Apache Flink for Apache Flink 集成，以 Kinesis Data Streams 作为源。

![\[以 Kinesis Data Streams 为源。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/gsr-kinesis-source.png)


**以 Kinesis Data Streams 为接收器**  
下图显示了将 Kinesis Data Streams 与 Managed Service for Apache Flink for Apache Flink 集成，以 Kinesis Data Streams 作为接收器。

![\[以 Kinesis Data Streams 为接收器。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/gsr-kinesis-sink.png)


要将 Kinesis Data Streams 与 Managed Service for Apache Flink for Apache Flink 集成，以 Kinesis Data Streams 作为源或接收器，请在下面进行代码更改。将粗体代码数据块添加到类似部分中相应的代码。

如果 Kinesis Data Streams 是源，请使用反序列化程序代码（数据块 2）。如果 Kinesis Data Streams 是接收器，请使用序列化程序代码（数据块 3）。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## 使用案例：与 AWS Lambda 集成
<a name="schema-registry-integrations-aws-lambda"></a>

要将 AWS Lambda 函数用作 Apache Kafka/Amazon MSK 使用器，并使用 AWS Glue 架构注册表反序列化 Avro 编码消息，请访问 [MSK Labs 页](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)。

## 使用案例：AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue 表支持您可以手动指定或通过引用 AWS Glue 架构注册表的架构。架构注册表与数据目录集成，以允许您在创建或更新数据目录中 AWS Glue 表或分区时选择性地使用存储于架构注册表中的架构。要标识架构注册表中的架构定义，您至少需要知道它所属的架构的 ARN。架构的架构版本（包含架构定义）可以通过其 UUID 或版本号引用。总有一个架构版本，即“最新”版本，可以在不知道其版本号或 UUID 的情况下查找。

当调用 `CreateTable` 或者 `UpdateTable` 操作时，您将传递一个 `TableInput` 结构，其中包含 `StorageDescriptor`，它可能有指向架构注册表中现有架构的 `SchemaReference`。同样，当您调用 `GetTable` 或者`GetPartition` API 时，响应可能包含架构和 `SchemaReference`。使用架构引用创建表或分区时，数据目录将尝试为此架构引用提取架构。如果无法在架构注册表中找到架构，它会在 `GetTable` 响应中返回空架构；否则响应将具有架构和架构引用。

您还可以在 AWS Glue 控制台中执行操作。

要执行这些操作并创建、更新或查看架构信息，您必须为调用用户授予允许 `GetSchemaVersion` API 的 IAM 角色权限。

### 添加表或更新表的架构
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

从现有架构添加新表会将表绑定到特定架构版本。注册新架构版本后，您可以从 AWS Glue 控制台的 View table (查看表) 页面或使用 [UpdateTable 操作（Python：update\$1table）](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 更新表定义。

#### 从现有架构添加表
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

您可以使用 AWS Glue 控制台或 `CreateTable` API 通过注册表中的架构版本创建 AWS Glue 表。

**AWS Glue API**  
当调用 `CreateTable` API 时，您将传递一个 `TableInput`，其中包含 `StorageDescriptor`，它可能有指向架构注册表中现有架构的 `SchemaReference`。

**AWS Glue 控制台**  
从 AWS Glue 控制台创建表：

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格的 **Data catalog (数据目录)** 中，请选择 **Tables (表)**。

1. 在 **Add Tables (添加表)** 菜单中，选择 **Add table from existing schema (从现有架构添加表)**。

1. 根据 AWS Glue 开发人员指南配置表属性和数据存储。

1. 在 **Choose a Glue schema (选择 Glue 架构)** 页面上，选择架构所在的 **Registry (注册表)**。

1. 选择 **Schema name (架构名称)**，然后选择要应用的架构的 **Version (版本)**。

1. 查看架构预览，然后选择 **Next (下一步)**。

1. 审核和创建表。

应用于表的架构和版本将在表列表的 **Glue schema (Glue 架构)**列中显示。您可以查看表以了解更多详细信息。

#### 更新表架构
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

当有新架构版本时，您可能需要使用 [UpdateTable 操作（Python：update\$1table）](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 或 AWS Glue 控制台更新表架构。

**重要**  
当为已手动指定 AWS Glue 架构的现有表更新架构时，架构注册表中引用的新架构可能不兼容。这可能会导致您的任务失败。

**AWS Glue API**  
当调用 `UpdateTable` API 时，您将传递一个 `TableInput`，其中包含 `StorageDescriptor`，它可能有指向架构注册表中现有架构的 `SchemaReference`。

**AWS Glue 控制台**  
从 AWS Glue 控制台更新表的架构：

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)。

1. 在导航窗格的 **Data catalog (数据目录)** 中，请选择 **Tables (表)**。

1. 从表列表中查看表。

1. 在告知您新版本的框中单击 **Update schema (更新架构)**。

1. 查看当前架构和新架构之间的差异。

1. 选择 **Show all schema differences (显示所有架构差异)**，查看更多详细信息。

1. 选择 **Save table (保存表)** 以接受新版本。

## 使用案例：AWS Glue 流式传输
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue 流式传输会首先使用流式传输源的数据并执行 ETL 操作，然后再写入输出接收器。可以使用数据表或直接通过指定源配置来指定输入流式传输源。

AWS Glue 流式传输支持将利用 AWS Glue Schema 注册表中存在的 Schema 创建的数据目录表作为流式传输源。您可以在 AWS Glue Schema 注册表中创建一个 Schema 并利用此 Schema 创建一个带有流式传输源的 AWS Glue 表。此 AWS Glue 表可以用作 AWS Glue 流式传输任务的输入，以确保输入流中数据的反序列化。

这里需要注意的是，在 AWS Glue Schema 注册表中的 Schema 更改时，您需要重启 AWS Glue 流式传输任务以反映 Schema 中的更改。

## 使用案例：Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Apache Kafka Streams API 是一个客户端库，用于处理和分析存储在 Apache Kafka 中的数据。本部分介绍 Apache Kafka Streams 与 AWS Glue 架构注册表的集成，允许您在数据流应用程序上管理和强制实施架构。有关 Apache Kafka Streams 的更多信息，请参阅 [Apache Kafka](https://kafka.apache.org/documentation/streams/)。

### 与 SerDes 库集成
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

有一个 `GlueSchemaRegistryKafkaStreamsSerde` 类，您可以使用其配置 Streams 应用程序。

#### Kafka Streams 应用程序示例代码
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

在 Apache Kafka Streams 应用程序内使用 AWS Glue 架构注册表：

1. 配置 Kafka Streams 应用程序。

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. 通过主题 avro-input 创建流。

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. 处理数据记录（该示例会筛选出 favorite\$1color 值为 pink 或金额值为 15 的记录）。

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. 将结果写回主题 avro-output。

   ```
   result.to("avro-output");
   ```

1. 启动 Apache Kafka Streams 应用程序。

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### 实施结果
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

这些结果显示了记录筛选流程，这些记录在步骤 3 中筛选出，其 favorite\$1color 为“pink”或值为“15.0”。

筛选前的记录：

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

筛选后的记录：

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### 使用案例：Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

Apache Kafka Connect 与 AWS Glue 架构注册表集成，支持您从连接器获取架构信息。Apache Kafka 转换器指定 Apache Kafka 内数据的格式以及如何将其转换为 Apache Kafka Connect 数据。每个 Apache Kafka Connect 用户都需要配置这些转换器，基于从 Apache Kafka 加载数据或将数据存储到 Apache Kafka 时期望数据采用的格式。通过这种方式，您可以定义自己的转换器，将 Apache Kafka Connect 数据转换为 AWS Glue 架构注册表（例如：Avro），并使用我们的序列化程序注册其架构并执行序列化。然后，转换器还能够使用我们的反序列化程序来反序列化从 Apache Kafka 接收的数据，并将其转换回 Apache Kafka Connect 数据。下面提供了一个示例工作流图。

![\[Apache Kafka Connect 工作流。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. 克隆[适用于 AWS Glue 架构注册表的 Github 存储库](https://github.com/awslabs/aws-glue-schema-registry)，安装 `aws-glue-schema-registry` 项目。

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. 如果您计划以*独立*模式使用 Apache Kafka Connect，请按照下面针对本步骤的说明更新 **connect-standalone.properties**。如果您计划以*分布式*模式使用 Apache Kafka Connect，请按照相同的说明更新 **connect-avro-distributed.properties**。

   1. 您还可以将这些属性添加到 Apache Kafka Connect 属性文件：

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. 将下面的命令添加到 **kafka-run-class.sh** 下面的 **Launch mode (启动模式)**：

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. 将下面的命令添加到 **kafka-run-class.sh** 下面的 **Launch mode (启动模式)**

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   它应如下所示：

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. 如果使用 Bash，请运行以下命令以在 bash\$1profile 中设置您的 CLASSPATH。对于任何其他 Shell，请相应地更新环境。

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. （可选）如果要使用简单的文件源进行测试，请克隆文件源连接器。

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. 在源连接器配置下，将数据格式编辑为 Avro，将文件读取器编辑为 `AvroFileReader` 并从您正在读取的文件路径中更新示例 Avro 对象。例如：

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. 安装源连接器。

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. 更新 `<your Apache Kafka installation directory>/config/connect-file-sink.properties` 下面的接收器属性，更新主题名称和输出文件名。

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. 启动源连接器（在本示例中，它是一个文件源连接器）。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. 运行接收器连接器（在本示例中，它是一个文件接收器连接器）。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   有关 Kafka Connect 用法的示例，请查看[适用于 AWS Glue 架构注册表的 Github 存储库](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests)中 integration-tests 文件夹下的 run-local-tests.sh 脚本。

# 从第三方架构注册表迁移到 AWS Glue 架构注册表
<a name="schema-registry-integrations-migration"></a>

从第三方架构注册表到 AWS Glue 架构注册表的迁移依赖于现有的当前第三方架构注册表。如果 Apache Kafka 主题中存在使用第三方架构注册表发送的记录，则使用者需要第三方模式注册表来反序列化这些记录。`AWSKafkaAvroDeserializer` 支持指定辅助反序列化程序类，该类指向第三方反序列化程序并用于反序列化这些记录。

第三方架构的停用有两个条件。第一，仅当使用器不再需要使用第三方架构注册表的 Apache Kafka 主题中的记录，并且这些记录不再适用于这些使用器，则会发生停用。第二，当根据为这些主题指定的保留期限退出 Apache Kafka 主题时，会发生停用。请注意，如果您的主题具有无限保留期，您仍然可以迁移到 AWS Glue 架构注册表，但您将无法停用第三方架构注册表。作为一种解决方法，您可以使用应用程序或 Mirror Maker 2 从当前主题中读取并生成一个新主题，其中包含 AWS Glue 架构注册表。

从第三方架构注册表迁移到 AWS Glue 架构注册表

1. 在 AWS Glue 架构注册表中创建注册表，或者使用默认注册表。

1. 停止使用器。对其进行修改以包含 AWS Glue 架构注册表作为主要反序列化程序，第三方架构注册表作为辅助反序列化程序。
   + 设置使用器属性。在此示例中，secondary\$1deserializer 设置为不同的反序列化程序。行为如下所示：使用器从 Amazon MSK 检索记录，并首先尝试使用 `AWSKafkaAvroDeserializer`。如果无法读取包含 AWS Glue 架构注册表架构的 Avro 架构 ID 的幻字节，则 `AWSKafkaAvroDeserializer` 尝试使用 secondary\$1deserializer 中提供的反序列化程序类。使用器属性中还需要提供特定于辅助反序列化程序的属性，例如 schema\$1registry\$1url\$1config 和 specific\$1avro\$1reader\$1config，如下所示。

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. 重启使用器。

1. 停止创建器并将创建器指向 AWS Glue 架构注册表。

   1. 设置创建器属性。在此示例中，创建器将使用默认注册表和自动注册架构版本。

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. （可选）手动将现有架构和架构版本从当前第三方架构注册表移动到 AWS Glue 架构注册表，即 AWS Glue 架构注册表中的默认注册表或 AWS Glue 架构注册表中的特定非默认注册表。以 JSON 格式从第三方架构注册表导出架构并使用 AWS 管理控制台 或 AWS CLI 在 AWS Glue 架构注册表中创建新架构。

    如果您需要为使用 AWS CLI 和 AWS 管理控制台 新创建的架构版本启用与先前架构版本的兼容性检查，或者在启用架构版本自动注册的情况下发送包含新架构的消息时，则此步骤可能非常重要。

1. 启动创建器。