

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 結構描述登錄檔入門
<a name="schema-registry-gs"></a>

以下章節將提供概觀，並逐步引導您設定和使用結構描述登錄檔。如需結構描述登錄檔概念和要素的詳細資訊，請參閱[AWS Glue 結構描述登錄檔](schema-registry.md)。

**Topics**
+ [安裝 SerDe 程式庫](schema-registry-gs-serde.md)
+ [與 AWS Glue 結構描述登錄檔整合](schema-registry-integrations.md)
+ [從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔](schema-registry-integrations-migration.md)

# 安裝 SerDe 程式庫
<a name="schema-registry-gs-serde"></a>

SerDe 程式庫提供序列化和還原序列化資料的架構。

您將為產生資料的應用程式安裝開源序列化程式 (統稱為「序列化程式」)。序列化程式會處理序列化、壓縮以及與結構描述登錄檔的互動。序列化程式會自動從寫入結構描述登錄檔相容目的地的記錄擷取結構描述，例如 Amazon MSK。同樣地，您將在使用資料的應用程式上安裝開源還原序列化程式。

# Java 實作
<a name="schema-registry-gs-serde-java"></a>

**注意**  
必要條件：在完成下列步驟前，您必須擁有執行中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。您的生產者和消費者需要在 Java 8 或更高版本上執行。

若要在生產者和消費者上安裝程式庫：

1. 在生產者和消費者的 pom.xml 檔案中，透過下面的程式碼新增此相依性：

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   或者，您也可以複製 [AWS Glue 結構描述登錄檔 Github 儲存庫](https://github.com/awslabs/aws-glue-schema-registry)。

1. 使用這些必要屬性設定您的生產者：

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   如果沒有現有的結構描述，則需要開啟自動註冊 (下一個步驟)。如果您確實有一個想套用的結構描述，那麼請用您的結構描述名稱替換「my-schema」。如果結構描述自動註冊關閉，則也必須提供「registry-name」。如果結構描述是在「default-registry」下建立的，則登錄檔名稱可以省略。

1. (選用) 設定這些選用生產者屬性中的任何一個。如需詳細的屬性說明，請參閱[讀我檔案](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)。

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   自動註冊會在預設登錄檔 (「default-registry」) 下註冊結構描述版本。如果在上一個步驟中未指定 `SCHEMA_NAME`，則主題名稱會被推斷為 `SCHEMA_NAME`。

   如需相容性模式的詳細資訊，請參閱[結構描述版本控制和相容性](schema-registry.md#schema-registry-compatibility)。

1. 使用下列必要屬性設定您的消費者：

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 區域
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (選用) 設定這些選用的消費者屬性。如需詳細的屬性說明，請參閱[讀我檔案](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)。

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 實作
<a name="schema-registry-gs-serde-csharp"></a>

**注意**  
必要條件：在完成下列步驟前，您必須擁有執行中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。您的生產者和消費者需要在 .NET 8.0 或更高版本上執行。

## 安裝
<a name="schema-registry-gs-serde-csharp-install"></a>

對於 C\$1 應用程式，請使用下列其中一種方法安裝 AWS Glue 結構描述登錄檔 SerDe NuGet 套件：

**.NET CLI：**  
使用下列命令來安裝套件：

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

其中 `<rid>` 可以是 `1.0.0-linux-x64`，`1.0.0-linux-musl-x64`或 `1.0.0-linux-arm64`

**PackageReference （在您的 .csproj 檔案中）：**  
將下列項目新增至您的專案檔案：

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

其中 `<rid>` 可以是 `1.0.0-linux-x64`，`1.0.0-linux-musl-x64`或 `1.0.0-linux-arm64`

## 組態檔案設定
<a name="schema-registry-gs-serde-csharp-config"></a>

使用必要的設定建立組態屬性檔案 （例如 `gsr-config.properties`)：

**最小組態：**  
以下顯示最低組態範例：

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## 針對 Kafka SerDes 使用 C\$1 Glue 結構描述用戶端程式庫
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**序列化程式用量範例：**  
下列範例示範如何使用 序列化程式：

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**還原序列化程式用量範例：**  
下列範例示範如何使用還原序列化程式：

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## 搭配 KafkaFlow for SerDes 使用 C\$1 Glue 結構描述用戶端程式庫
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**序列化程式用量範例：**  
下列範例示範如何使用序列化程式設定 KafkaFlow：

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**還原序列化程式用量範例：**  
下列範例示範如何使用還原序列化程式設定 KafkaFlow：

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## 選用生產者屬性
<a name="schema-registry-gs-serde-csharp-optional"></a>

您可以使用其他選用屬性來擴展組態檔案：

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## 支援的資料格式
<a name="schema-registry-gs-serde-supported-formats"></a>

Java 和 C\$1 實作都支援相同的資料格式：
+ *AVRO*：Apache Avro 二進位格式
+ *JSON*：JSON 結構描述格式
+ *PROTOBUF*：通訊協定緩衝區格式

## 備註
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ 若要開始使用程式庫，請造訪 https：//[https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ 來源碼可在 https：//[https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry) 取得

# 建立登錄檔
<a name="schema-registry-gs3"></a>

您可以使用預設登錄檔，或使用 AWS Glue API 或 AWS Glue 主控台建立任意數量的新登錄檔。

**AWS Glue API**  
您可以使用這些步驟，用 AWS Glue API 執行此任務。

若要將 AWS CLI 用於AWS Glue結構描述登錄檔 APIs，請務必將 更新 AWS CLI 為最新版本。

 若要新增登錄檔，請使用 [CreateRegistry 動作 (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) API。指定 `RegistryName` 作為要建立的登錄檔的名稱，最大長度為 255，只能包含字母、數字、連字號、底線、金額符號或井號。

將 `Description` 指定為字串，長度不可超過 2048 個位元組，需符合 [URI 位址多行字串模式](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns)。

(選用) 為登錄檔指定一個或多個 `Tags`，作為索引鍵-值對的映射陣列。

```
aws glue create-registry --registry-name registryName1 --description description
```

建立登錄檔後，系統會指派 Amazon Resource Name (ARN)，您可以在 API 回應的 `RegistryArn` 中檢視。現在您已建立登錄檔，請為該登錄檔建立一或多個結構描述。

**AWS Glue 主控台**  
新增新的登錄檔AWS Glue主控台：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 選擇 **Add registry (新增登錄檔)**。

1. 輸入**登錄名稱**，由字母、數字、連字號或底線組成。無法變更此名稱。

1. 輸入登錄檔的 **Description (描述)** (選用)。

1. 或者，將一或多個標籤套用到您的登錄檔。選擇 **Add new tag (新增標籤)**，然後指定 **Tag key (標籤鍵)** 以及選用的 **Tag value (標籤值)**。

1. 選擇 **Add registry (新增登錄檔)**。

![\[建立登錄檔的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_create_registry.png)


建立登錄檔後，系統會指派 Amazon Resource Name (ARN)，您可以在 **Schema registries (結構描述登錄檔)** 的清單中選擇登錄檔來查看。現在您已建立登錄檔，請為該登錄檔建立一或多個結構描述。

# 處理 JSON 的特定記錄 (JAVA POJO)
<a name="schema-registry-gs-json-java-pojo"></a>

您可以使用純舊 Java 物件 (POJO) 並將該物件作為記錄傳遞。這類似於 AVRO 中的特定記錄的概念。[mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema) 可以為傳遞的 POJO 產生 JSON 結構描述。此程式庫也可以在 JSON 結構描述中注入其他資訊。

AWS Glue 結構描述登錄檔程式庫會使用注入的「className」欄位，在結構描述中提供完全分類的類別名稱。「className」欄位由還原序列化程式用於還原序列化為該類別的物件。

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# 建立結構描述
<a name="schema-registry-gs4"></a>

您可以使用 AWS Glue API 或 AWS Glue 主控台建立結構描述。

**AWS Glue API**  
您可以使用這些步驟，用 AWS Glue API 執行此任務。

若要新增結構描述，請使用 [CreateSchema 動作 (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API。

指定 `RegistryId` 結構來指出結構描述的登錄檔。或者，省略 `RegistryId` 以使用預設登錄檔。

指定由字母、數字、連字號或底線組成的 `SchemaName`，以及指定 `DataFormat` 為 **AVRO** 或 **JSON**。在結構描述上設定 `DataFormat` 後就不可改變。

指定 `Compatibility` 模式：
+ *向後 (建議使用)* — 消費者可以同時讀取目前版本和先前版本。
+ *全部向後* — 消費者可以讀取目前版本和所有先前版本。
+ *向前* — 消費者可以讀取目前和後續版本。
+ *全部向前* — 消費者可以讀取目前版本和所有後續版本。
+ *完整* — 向後和向前的組合。
+ *完整全部* — 全部向後與全部向前的組合。
+ *無* — 不會執行相容性檢查。
+ *已停用* — 防止此結構描述的任何版本控制。

選用地指定結構描述的 `Tags`。

指定 `SchemaDefinition` 可定義 Avro、JSON 或 Protobuf 資料格式的結構描述。請參閱範例。

對於 Avro 資料格式：

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

對於 JSON 資料格式：

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

對於 Protobuf 資料格式：

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue 主控台**  
若要使用 AWS Glue 主控台來新增結構描述：

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Schemas (Data Catalog)** (結構描述 Data Catalog)。

1. 選擇 **Add schema (新增結構描述)**。

1. 輸入 **Schema name (結構描述名稱)**，由字母、數字、連字號、底線、金額符號或井號組成。無法變更此名稱。

1. 選擇 **Registry (登錄檔)**，其中結構描述會從下拉式選單儲存。建立後，無法變更父登錄檔。

1. 將 **Data format (資料格式)** 保留為 *Apache Avro* 或 *JSON*。此格式會套用至此結構描述的所有版本。

1. 選擇 **Compatibility mode (相容性模式)**。
   + *向後 (建議使用)* — 接收者可以讀取目前和以前的版本。
   + *全部向後* — 接收者可以讀取目前和所有以前的版本。
   + *向前* — 傳送者可以寫入目前和先前的版本。
   + *全部向前* — 傳送者可以寫入目前和所有先前的版本。
   + *完整* — 向後和向前的組合。
   + *完整全部* — 全部向後與全部向前的組合。
   + *無* — 不執行相容性檢查。
   + *已停用* — 防止此結構描述的任何版本控制。

1. 為登錄檔輸入選用 **Description (描述)**，最多 250 個字元。  
![\[建立資料結構描述的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_create_schema.png)

1. 或者，將一或多個標籤套用到您的結構描述。選擇 **Add new tag (新增標籤)**，然後指定 **Tag key (標籤鍵)** 以及選用的 **Tag value (標籤值)**。

1. 在 **First schema version (第一個結構描述版本)** 方塊中，輸入或貼上您的初始結構描述。

   如需 Avro 格式，請參閱[使用 Avro 資料格式](#schema-registry-avro)

   如需 JSON 格式的資訊，請參閱[使用 JSON 資料格式](#schema-registry-json)

1. 選用地選擇 **Add metadata (新增中繼資料)** 以新增版本中繼資料來標註或分類您的結構描述版本。

1. 選擇 **Create schema and version (建立結構描述和版本)**。

![\[建立資料結構描述的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_create_schema2.png)


結構描述隨即建立，並顯示在 **Schemas (結構描述)** 清單下。

## 使用 Avro 資料格式
<a name="schema-registry-avro"></a>

Avro 提供資料序列化和資料交換服務。Avro 儲存在 JSON 格式的資料定義，使得它易於閱讀和解釋。資料本身會以二進位格式儲存。

如需定義 Apache Avro 結構描述的相關資訊，請參閱 [Apache Avro 規格](http://avro.apache.org/docs/current/spec.html)。

## 使用 JSON 資料格式
<a name="schema-registry-json"></a>

資料可以用 JSON 格式來序列化。[JSON 結構描述格式](https://json-schema.org/)定義了 JSON 結構描述格式的標準。

# 更新結構描述或登錄檔
<a name="schema-registry-gs5"></a>

建立後，您可以編輯結構描述、結構描述版本或登錄檔。

## 更新登錄檔
<a name="schema-registry-gs5a"></a>

您可以使用 AWS Glue API 或 AWS Glue 主控台更新登錄檔。無法編輯現有登錄檔的名稱。您可以編輯登錄檔的描述。

**AWS Glue API**  
若要更新現有的登錄檔，請使用 [UpdateRegistry 動作 (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API。

指定 `RegistryId` 結構，以指出您要更新的登錄檔。傳遞 `Description` 以變更登錄檔的描述。

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue 主控台**  
使用 AWS Glue 主控台更新登錄檔：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從登錄檔清單中選擇登錄檔，方法是勾選其方塊。

1. 在 **Actions (動作)** 選單上，選擇 **Edit registry (編輯登錄檔)**。

# 更新結構描述
<a name="schema-registry-gs5b"></a>

您可以更新結構描述的描述或相容性設定。

若要更新現有的結構描述，請使用 [UpdateSchema 動作 (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API。

指定 `SchemaId` 結構描述，以指出您要更新的結構描述。必須提供其中一個 `VersionNumber` 或 `Compatibility`。

程式碼範例 11：

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# 新增結構描述版本
<a name="schema-registry-gs5c"></a>

當您新增結構描述版本時，您需要比較版本以確保新結構描述可被接受。

若要將新版本新增到現有結構描述，請使用 [RegisterSchemaVersion 動作 (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API。

指定 `SchemaId` 結構來指示您想要新增版本的結構描述，以及 `SchemaDefinition` 來定義結構描述。

程式碼範例 12：

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Schemas (Data Catalog)** (結構描述 Data Catalog)。

1. 從結構描述清單中選擇結構描述，方法是勾選其方塊。

1. 從清單中選擇一或多個結構描述，方法是勾選其方塊。

1. 在 **Action (動作)** 選單中，選擇 **Register new version (註冊新版本)**。

1. 在 **New version (新版本)** 方塊中，輸入或貼上新的結構描述。

1. 選擇 **Compare with previous version (與舊版本比較)** 以查看與以前的結構描述版本的差異。

1. 選用地選擇 **Add metadata (新增中繼資料)** 以新增版本中繼資料來標註或分類您的結構描述版本。輸入 **Key (索引鍵)** 和選用的 **Value (值)**。

1. 選擇 **Register version (註冊版本)**。

![\[新增結構描述版本。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_add_schema_version.png)


結構描述版本會顯示在版本清單中。如果版本變更了相容性模式，版本將被標記為檢查點。

## 結構描述版本比較的範例
<a name="schema-registry-gs5c1"></a>

當您選擇 **Compare with previous version (與舊版本比較)**，您會看到上一個版本和新版本一起顯示。變更的資訊將會反白顯示如下：
+ *黃色*：表示已變更的資訊。
+ *綠色*：表示在最新版本中新增的內容。
+ *紅色*：表示在最新版本中移除的內容。

您也可以與更舊版本進行比較。

![\[結構描述版本比較的範例。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_version_comparison.png)


# 刪除結構描述或登錄檔
<a name="schema-registry-gs7"></a>

刪除結構描述、結構描述版本或登錄檔是無法復原的永久動作。

## 刪除結構描述
<a name="schema-registry-gs7a"></a>

當結構描述不再用於登錄檔、使用 或 [DeleteSchema 動作 (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API 時 AWS 管理主控台，您可能想要將其刪除。

刪除一或多個結構描述是無法復原的永久動作。請確定已不再需要結構描述。

若要從登錄檔刪除結構描述，請呼叫 [DeleteSchema 動作 (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API, 指定 `SchemaId` 結構來識別結構描述。

例如：

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue 主控台**  
從 AWS Glue 主控台刪除結構描述：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從登錄檔清單中選擇包含結構描述的登錄檔。

1. 從清單中選擇一或多個結構描述，方法是勾選其方塊。

1. 在 **Action (動作)** 選單中，選擇 **Delete schema (刪除結構描述)**。

1. 在欄位中輸入文字 **Delete** 以確認刪除。

1. 選擇 **刪除**。

您指定的結構描述會從登錄檔中刪除。

## 刪除結構描述版本
<a name="schema-registry-gs7b"></a>

當結構描述累積在登錄檔中時，您可能想要使用 或 [DeleteSchemaVersions 動作 (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API AWS 管理主控台刪除不需要的結構描述版本。刪除一或多個結構描述版本是無法復原的永久動作。請確定不再需要結構描述版本。

刪除結構描述版本時，請注意以下限制：
+ 您無法刪除檢查點的版本。
+ 連續版本的範圍不能超過 25。
+ 最新結構描述版本不可處於待定狀態。

指定 `SchemaId` 結構來識別結構描述，並指定 `Versions` 作為要刪除的版本範圍。如需指定版本或版本範圍的詳細資訊，請參閱[DeleteRegistry 動作 (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry)。您指定的結構描述版本會從登錄檔中刪除。

在此呼叫後呼叫 [ListSchemaVersions 動作 (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API 將列出已刪除版本的狀態。

例如：

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從登錄檔清單中選擇包含結構描述的登錄檔。

1. 從清單中選擇一或多個結構描述，方法是勾選其方塊。

1. 在 **Action (動作)** 選單中，選擇 **Delete schema (刪除結構描述)**。

1. 在欄位中輸入文字 **Delete** 以確認刪除。

1. 選擇 **刪除**。

您指定的結構描述版本會從登錄檔中刪除。

# 刪除登錄檔
<a name="schema-registry-gs7c"></a>

當登錄檔包含的結構描述不應再組織在該登錄檔下時，您可能會想要刪除登錄檔。您將需要將這些結構描述重新指派到另一個登錄檔。

刪除一或多個登錄檔是無法復原的永久動作。請確定登錄檔不再需要。

預設登錄檔可以使用 AWS CLI來刪除。

**AWS Glue API**  
若要刪除整個登錄檔，包括結構描述及其所有版本，請呼叫 [DeleteRegistry 動作 (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API。指定 `RegistryId` 結構描述以識別登錄檔。

例如：

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

若要取得刪除操作的狀態，您可以在非同步呼叫之後呼叫 `GetRegistry` API。

**AWS Glue 主控台**  
若要從 AWS Glue 主控台刪除登錄檔：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格的 **Data catalog** 下，選擇 **Schema registries** (結構描述登錄檔)。

1. 從清單中選擇登錄檔，方法是勾選其方塊。

1. 在 **Actions (動作)** 選單中，選擇 **Delete registry (刪除登錄檔)**。

1. 在欄位中輸入文字 **Delete** 以確認刪除。

1. 選擇 **刪除**。

您選取的登錄檔會從 AWS Glue 刪除。

## 序列化程式的 IAM 範例
<a name="schema-registry-gs1"></a>

**注意**  
AWS 受管政策會授予常見使用案例的必要許可。如需使用受管政策來管理結構描述登錄檔的相關資訊，請參閱[AWS 的 受管 （預先定義） 政策 AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed)。

對於序列化程式，您應該建立類似於下面的最小政策，以便您能夠找到特定結構描述定義的 `schemaVersionId`。請注意，您應該有登錄檔的讀取許可，才能讀取登錄檔中的結構描述。您可以使用 `Resource` 子句，限制可以讀取的登錄檔。

程式碼範例 13：

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

此外，您還可以允許生產者透過包括以下額外的方法來建立新的結構描述和版本。請注意，您應該能夠檢查登錄檔以新增/刪除/演變其中的結構描述。您可以使用 `Resource` 子句，限制可以檢查的登錄檔。

程式碼範例 14：

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## 還原序列化程式的 IAM 範例
<a name="schema-registry-gs1b"></a>

對於還原序列化程式 (消費者方面)，您應該建立類似下面的政策，以允許還原序列化程式從結構描述登錄檔中擷取結構描述以進行還原序列化。請注意，您應該能夠檢查登錄檔以擷取其中的結構描述。

程式碼範例 15：

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## 使用 的私有連線 AWS PrivateLink
<a name="schema-registry-gs-private"></a>

您可以使用 AWS Glue 透過定義 的介面 VPC 端點 AWS PrivateLink ，將資料生產者的 VPC 連線至 AWS Glue。使用 VPC 介面端點時，VPC 和 AWS Glue 之間的通訊完全在 AWS 網路中執行。如需詳細資訊，請參閱[使用 AWS Glue 搭配 VPC 端點](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html)。

# 存取 Amazon CloudWatch 指標
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch 指標可作為 CloudWatch 的免費方案的一部分使用。您可以在 CloudWatch 主控台中存取這些指標。API 層級指標包括 CreateSchema (成功和延遲)、GetSchemaByDefinition (成功和延遲)、GetSchemaVersion (成功和延遲)、RegisterSchemaVersion (成功和延遲)、PutSchemaVersionMetadata (成功和延遲)。資源層級指標包括 Registry.ThrottledByLimit、SchemaVersion.ThrottledByLimit、SchemaVersion.Size。

# 結構描述登錄的範例 CloudFormation 範本
<a name="schema-registry-integrations-cfn"></a>

以下是在 中建立結構描述登錄檔資源的範例範本 CloudFormation。若要在您的帳戶中建立此堆疊，請將上述範本複製到檔案 `SampleTemplate.yaml`，然後執行下列命令：

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

此範例使用 `AWS::Glue::Registry` 建立登錄檔、使用 `AWS::Glue::Schema` 建立結構描述、使用 `AWS::Glue::SchemaVersion` 建立結構描述版本，以及使用 `AWS::Glue::SchemaVersionMetadata` 填入結構描述版本中繼資料。

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# 與 AWS Glue 結構描述登錄檔整合
<a name="schema-registry-integrations"></a>

這些章節說明與 AWS Glue 結構描述登錄檔的整合。這些章節中的範例顯示了 AVRO 資料格式的結構描述。如需更多範例 (包括具有 JSON 資料格式的結構描述)，請參閱[AWS Glue 結構描述登錄檔開源儲存庫](https://github.com/awslabs/aws-glue-schema-registry)中的整合測試和讀我資訊。

**Topics**
+ [使用案例：將結構描述登錄檔連線到 Amazon MSK 或 Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [使用案例：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合](#schema-registry-integrations-kds)
+ [使用案例：Amazon Managed Service for Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [使用案例：與 整合 AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [使用案例： AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [使用案例：AWS Glue 串流](#schema-registry-integrations-aws-glue-streaming)
+ [使用案例：Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## 使用案例：將結構描述登錄檔連線到 Amazon MSK 或 Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

假設您正在將資料寫入 Apache Kafka 主題，並且您可以按照下列步驟開始使用。

1. 建立具有至少一個主題的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。如果是建立 Amazon MSK 叢集，您可以使用 AWS 管理主控台。請遵循以下說明：*Amazon Managed Streaming for Apache Kafka 開發人員指南*中的 [Amazon MSK 入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)。

1. 遵循上述的[安裝 SerDe 程式庫](schema-registry-gs-serde.md)步驟。

1. 若要建立結構描述登錄檔、結構描述或結構描述版本，請依照這份文件的[結構描述登錄檔入門](schema-registry-gs.md)一節指示進行。

1. 啟動您的生產者和消費者以使用結構描述登錄檔來從/向 Amazon MSK 或 Apache Kafka 主題寫入和讀取記錄。範例生產者和消費者程式碼可以在 Serde 程式庫的[讀我檔案](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)找到。生產者上的結構描述登錄檔程式庫會自動序列化記錄，並使用結構描述版本 ID 裝飾記錄。

1. 如果已輸入此記錄的結構描述，或者如果開啟自動註冊，則結構描述將已在結構描述登錄檔中註冊。

1. 消費者使用 AWS Glue 結構描述登錄檔程式庫從 Amazon MSK 或 Apache Kafka 主題讀取，會自動從結構描述登錄檔查詢結構描述。

## 使用案例：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合
<a name="schema-registry-integrations-kds"></a>

此整合要求您擁有現有的 Amazon Kinesis 資料串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的 [Amazon Kinesis Data Streams 入門](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)。

您可以透過兩種方式與 Kinesis 資料串流中的資料互動。
+ 透過 Java 中的 Kinesis Producer Library (KPL) 和 Kinesis Client Library (KCL) 程式庫。不提供多語言支援。
+ 透過 適用於 Java 的 AWS SDK中提供的 `PutRecords`、`PutRecord` 以及 `GetRecords` Kinesis Data Streams API。

如果您目前使用 KPL/KCL 程式庫，我們建議您繼續使用該方法。有更新的 KCL 和 KPL 版本與結構描述登錄檔整合，如範例所示。否則，您可以使用範例程式碼來利用 AWS Glue 結構描述登錄檔 (如果直接使用 KDS API)。

結構描述登錄檔整合只適用於 KPL v0.14.2 或更高版本和 KCL v2.3 或更高版本。結構描述登錄檔與 JSON 資料格式整合適用於 KPL v0.14.8 或更高版本和 KCL v2.3.6 或更高版本。

### 使用 Kinesis SDK V2 與資料互動
<a name="schema-registry-integrations-kds-sdk-v2"></a>

本節說明如何使用 Kinesis SDK V2 與 Kinesis 互動

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### 使用 KPL/KCL 程式庫與資料互動
<a name="schema-registry-integrations-kds-libraries"></a>

本節說明使用 KPL/KCL 程式庫將 Kinesis Data Streams 與結構描述登錄檔整合。如需使用 KPL/KCL 的詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Amazon Kinesis Producer Library 開發生產者](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)。

#### 在 KPL 中設定結構描述登錄檔
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. 定義 AWS Glue 結構描述登錄檔中撰寫的資料的結構描述定義、資料格式和結構描述名稱。

1. 選用地設定 `GlueSchemaRegistryConfiguration` 物件。

1. 將結構描述物件傳遞給 `addUserRecord API`。

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### 設定 Kinesis client library
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

您將用 Java 開發 Kinesis Client Library 消費者。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[以 Java 開發 Kinesis Client Library 消費者](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html)。

1. 傳遞 `GlueSchemaRegistryConfiguration` 物件，建立 `GlueSchemaRegistryDeserializer` 的執行個體。

1. 傳遞 `GlueSchemaRegistryDeserializer` 至 `retrievalConfig.glueSchemaRegistryDeserializer`。

1. 透過呼叫 `kinesisClientRecord.getSchema()` 存取內送訊息的結構描述。

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### 使用 Kinesis Data Streams API 與資料互動
<a name="schema-registry-integrations-kds-apis"></a>

本節說明使用 Kinesis Data Streams API 將 Kinesis Data Streams 與結構描述登錄檔整合。

1. 更新這些 Maven 相依性：

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. 在生產者中，使用 Kinesis Data Streams 中的 `PutRecords` 或 `PutRecord` API 新增結構描述標頭資訊。

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. 在生產者中，使用 `PutRecords` 或 `PutRecord` API，將記錄放入資料串流中。

1. 在消費者中，從標頭移除結構描述記錄，並序列化 Avro 結構描述記錄。

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### 使用 Kinesis Data Streams API 與資料互動
<a name="schema-registry-integrations-kds-apis-reference"></a>

以下是使用 `PutRecords` 和 `GetRecords` API 的範例程式碼。

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## 使用案例：Amazon Managed Service for Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink 是一個流行的開源架構和分散式處理引擎，用於對未限制和有限制資料串流進行狀態計算。Amazon Managed Service for Apache Flink 是一種全受管 AWS 服務，可讓您建置和管理 Apache Flink 應用程式來處理串流資料。

開源 Apache Flink 提供一些來源和接收器。例如，預先定義的資料來源包括從檔案、目錄和通訊端讀取，以及從集合和迭代器擷取資料。Apache Flink DataStream Connector 為 Apache Flink 提供與各種第三方系統連接的程式碼，例如 Apache Kafka 或 Kinesis 做為來源和/或接收器。

如需詳細資訊，請參閱 [Amazon Kinesis Data Analytics 開發人員指南](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)。

### Apache Flink Kafka 連接器
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink 提供 Apache Kafka 資料串流連接器，用於使用恰好一次的保證從 Kafka 主題讀取資料以及寫入資料至 Kafka 主題。Flink 的 Kafka 消費者 `FlinkKafkaConsumer` 提供從一個或多個 Kafka 主題的讀取存取權。Apache Flink 的 Kafka 生產者 `FlinkKafkaProducer` 允許寫入記錄串流到一個或多個 Kafka 主題。如需詳細資訊，請參閱 [Apache Kafka 連接器](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html)。

### Apache Flink Kinesis 串流連接器
<a name="schema-registry-integrations-kinesis-connector"></a>

Kinesis Data Streams 連接器可存取 Amazon Kinesis Data Streams。`FlinkKinesisConsumer` 是完全一次性的平行串流資料來源，可訂閱相同 AWS 服務區域內的多個 Kinesis 串流，並在任務執行時透明地處理串流的重新分片。消費者的每個子工作都負責從多個 Kinesis 分片擷取資料記錄。每個子工作擷取的碎片數量將隨著碎片關閉並由 Kinesis 建立而改變。`FlinkKinesisProducer` 使用 Kinesis Producer Library (KPL) 將 Apache Flink 串流中的資料放入 Kinesis 串流中。如需詳細資訊，請參閱 [Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html)。

如需詳細資訊，請參閱 [AWS Glue 結構描述 GitHub 儲存庫](https://github.com/awslabs/aws-glue-schema-registry)。

### 與 Apache Flink 整合
<a name="schema-registry-integrations-apache-flink-integrate"></a>

結構描述登錄檔提供的 SerDes 程式庫與 Apache Flink 整合。若要使用 Apache Flink，您需要實作稱為 `GlueSchemaRegistryAvroSerializationSchema` 和 `GlueSchemaRegistryAvroDeserializationSchema` 的 [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) 和 [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) 介面，然後將介面插入 Apache Flink 連接器。

### 新增 AWS Glue 結構描述登錄檔相依性到 Apache Flink 應用程式
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

在 Apache Flink 應用程式中將整合相依性設定為 AWS Glue 結構描述登錄檔：

1. 將相依性新增到 `pom.xml` 檔案。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### 將 Kafka 或 Amazon MSK 與 Apache Flink 整合
<a name="schema-registry-integrations-kda-integrate-msk"></a>

您可以使用 Managed Service for Apache Flink，搭配 Kafka 作為來源或搭配 Kafka 作為接收器。

**Kafka 作為來源**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kafka 作為來源。

![\[Kafka 作為來源。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka 做為接收器**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kafka 作為接收器。

![\[Kafka 做為接收器。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kafka-sink.png)


若要整合 Kafka (或 Amazon MSK) 與 Managed Service for Apache Flink，搭配 Kafka 作為來源或搭配 Kafka 作為接收器，請對程式碼進行下列變更。將粗體程式碼區塊新增到類似區段中的各自程式碼中。

如果 Kafka 是來源，請使用還原序列化程式程式碼 (區塊 2)。如果 Kafka 是接收器，請使用序列化程式程式碼 (區塊 3)。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### 將 Kinesis Data Streams 與 Apache Flink 整合
<a name="schema-registry-integrations-integrate-kds"></a>

您可以使用 Managed Service for Apache Flink，搭配 Kinesis Data Streams 作為來源或接收器。

**Kinesis Data Streams 做為來源**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kinesis Data Streams 作為來源。

![\[Kinesis Data Streams 做為來源。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams 做為接收器**  
下圖顯示將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kinesis Data Streams 作為接收器。

![\[Kinesis Data Streams 做為接收器。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/gsr-kinesis-sink.png)


若要將 Kinesis Data Streams 與 Managed Service for Apache Flink 整合，並將 Kinesis Data Streams 作為來源或將 Kinesis Data Streams 作為接收器，請對程式碼進行下列變更。將粗體程式碼區塊新增到類似區段中的各自程式碼中。

如果 Kinesis Data Streams 是來源，請使用還原序列化程式程式碼 (區塊 2)。如果 Kinesis Data Streams 是接收器，請使用序列化程式程式碼 (區塊 3)。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## 使用案例：與 整合 AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

若要使用 AWS Lambda函數做為 Apache Kafka/Amazon MSK 取用者，並使用AWS Glue結構描述登錄檔還原序列化 Avro 編碼的訊息，請造訪 [MSK 實驗室頁面](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)。

## 使用案例： AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue 資料表支援您可以手動指定的結構描述，也可以參考 AWS Glue 結構描述登錄檔。結構描述登錄檔與 Data Catalog 整合，可讓您在建立或更新 Data Catalog 中的 AWS Glue 資料表或分割區時選用地使用儲存在結構描述登錄檔中的結構描述。若要識別結構描述登錄檔中的結構描述定義，您至少需要知道它所屬結構描述的 ARN。包含結構描述定義的結構描述版本，可以透過其 UUID 或版本號碼來參考。總有一個結構描述版本，即「最新」版本，可以在不知道其版本號碼或 UUID 的情況下查詢。

呼叫 `CreateTable` 或 `UpdateTable` 操作時，您將傳遞一個 `TableInput` 結構，其中包含 `StorageDescriptor`，它可能有一個新增到結構描述登錄檔中現有結構描述的 `SchemaReference`。同樣地，當您呼叫 `GetTable` 或 `GetPartition` API，回應可能包含結構描述和 `SchemaReference`。使用結構描述參考建立資料表或分割區時，Data Catalog 會嘗試擷取此結構描述參考的結構描述。如果它無法在結構描述登錄檔中找到結構描述，它會在 `GetTable` 回應中傳回空的結構描述；否則回應將同時具有結構描述和結構描述參考。

您可以在 AWS Glue 主控台中執行下列動作。

若要執行這些操作並建立、更新或檢視結構描述資訊，您必須向提供 `GetSchemaVersion` API 許可的呼叫使用者授予 IAM 角色。

### 新增資料表或更新資料表的結構描述
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

從現有的結構描述新增資料表，會將資料表繫結至特定的結構描述版本。註冊新的結構描述版本後，您可以從 AWS Glue 主控台 View table (檢視資料表) 頁面或使用 [UpdateTable 動作 (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 更新此資料表定義。

#### 從現有的結構描述新增資料表
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

您可以使用 AWS Glue 主控台或 `CreateTable` API，從登錄檔中的結構描述版本建立 AWS Glue 資料表。

**AWS Glue API**  
呼叫 `CreateTable` API 時，您將傳遞包含 `StorageDescriptor` 的 `TableInput`，它對於結構描述登錄檔中現有的結構描述會有一個 `SchemaReference`。

**AWS Glue 主控台**  
使用 AWS Glue 主控台建立資料表

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Tables** (資料表)。

1. 在 **Add Tables (新增資料表)** 選單中，選擇 **Add table from existing schema (從現有結構描述新增資料表)**。

1. 根據《AWS Glue 開發人員指南》設定資料表屬性和資料存放區。

1. 在 **Choose a Glue schema (選擇 Glue 結構描述)** 頁面上，選取結構描述所在的 **Registry (登錄檔)**。

1. 選擇 **Schema name (結構描述名稱)**，然後選取要套用之結構描述的 **Version (版本)**。

1. 檢閱結構描述預覽，然後選擇 **Next (下一步)**。

1. 檢閱和建立資料表。

套用至資料表的結構描述和版本會顯示在資料表清單的 **Glue schema (Glue 結構描述)** 欄。您可以檢視資料表以查看更多詳細資訊。

#### 更新資料表的結構描述
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

有新的結構描述版本可用時，您可能需要使用 [UpdateTable 動作 (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 或 AWS Glue 主控台來更新資料表的結構描述。

**重要**  
更新具有手動指定之 AWS Glue 結構描述的現有資料表的結構描述時，結構描述登錄檔中參考的新結構描述可能不相容。這可能會導致您的任務失敗。

**AWS Glue API**  
呼叫 `UpdateTable` API 時，您將傳遞包含 `StorageDescriptor` 的 `TableInput`，它對於結構描述登錄檔中現有的結構描述會有一個 `SchemaReference`。

**AWS Glue 主控台**  
從 AWS Glue 主控台更新資料表的結構描述：

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\) 開啟 AWS Glue主控台。

1. 在導覽窗格中，於 **Data catalog** 下選擇 **Tables** (資料表)。

1. 從資料表清單中檢視資料表。

1. 按一下 **Update schema (更新結構描述)**，通知您有關新版本的方塊。

1. 檢閱目前資料結構描述和新資料結構描述之間的差異。

1. 選擇 **Show all schema differences (顯示所有結構描述差異)** 以查看更多詳細資訊。

1. 選擇 **Save table (儲存資料表)** 以接受新的版本。

## 使用案例：AWS Glue 串流
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue 串流會消耗來自串流來源的資料，並在寫入輸出接收器之前執行 ETL 操作。輸入串流來源可以使用資料表來指定，也可以透過指定來源配置的方式直接指定。

在該結構描述存在於 AWS Glue 結構描述登錄檔的情況下，AWS Glue 串流支援針對串流來源所建立的 Data Catalog 資料表。您可以在 AWS Glue 結構描述登錄檔中建立結構描述，並使用此結構描述來建立具有串流來源的 AWS Glue 表格。此 AWS Glue 表格可以用來作為 AWS Glue 串流任務的輸入，以供還原序列化輸入串流中的資料之用。

請注意，當 AWS Glue 結構描述登錄檔中的結構描述變更時，您必須重新啟動 AWS Glue 串流任務需求以反映結構描述中的變更。

## 使用案例：Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Apache Kafka Streams API 是用於處理和分析 Apache Kafka 中儲存資料的用戶端程式庫。本節介紹 Apache Kafka Streams 與 AWS Glue 結構描述登錄檔的整合，可讓您在資料串流應用程式上管理和強制執行結構描述。如需 Apache Kafka Streams 的詳細資訊，請參閱 [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/)。

### 與 SerDes 程式庫整合
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

有一個 `GlueSchemaRegistryKafkaStreamsSerde` 類別，您可用於設定串流應用程式。

#### Kafka Streams 應用程式範例程式碼
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

在 Apache Kafka Streams 應用程式中使用 AWS Glue 結構描述登錄檔：

1. 設定 Kafka Streams 應用程式。

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. 從主題 avro-input 建立串流。

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. 處理資料記錄 (範例會篩選出 favorite\$1color 值為 pink 或 amount 值為 15 的記錄)。

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. 將結果寫回主題 avro-output。

   ```
   result.to("avro-output");
   ```

1. 啟動 Apache Kafka Streams 應用程式。

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### 實作結果
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

這些結果顯示記錄篩選程序，其在步驟 3 中篩選出 favorite\$1color 為「pink」或值為「15.0」的記錄。

篩選前的記錄：

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

篩選後的記錄：

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### 使用案例：Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

將 Apache Kafka Connect 與 AWS Glue 結構描述登錄檔整合，可讓您從連接器取得結構描述資訊。Apache Kafka 轉換器指定 Apache Kafka 中的資料的格式，以及如何將其轉換成 Apache Kafka Connect 資料。每個 Apache Kafka Connect 使用者將需要根據他們希望他們的資料在載入或儲存到 Apache Kafka 時的格式，設定這些轉換器。透過這種方式，您可以定義自己的轉換器，將 Apache Kafka Connect 資料轉換為 AWS Glue 結構描述登錄檔中使用的類型 (例如：Avro) 並利用我們的序列化程式註冊其結構描述並進行序列化。然後轉換器也能夠使用我們的還原序列化程式將從 Apache Kafka 接收到的資料還原序列化，並將其轉換回 Apache Kafka Connect 資料。範例工作流程圖如下所示。

![\[Apache Kafka Connect 工作流程。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. 安裝 `aws-glue-schema-registry` 專案，方法是複製 [AWS Glue 結構描述登錄檔的 Github 儲存庫](https://github.com/awslabs/aws-glue-schema-registry)。

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. 如果您打算在*獨立*模式中使用 Apache Kafka Connect，請使用此步驟的下列指示來更新 **connect-standalone.properties**。如果您打算在*分散式*模式中使用 Apache Kafka Connect，請使用相同的說明更新 **connect-avro-distributed.properties**。

   1. 將這些屬性也新增到 Apache Kafka 連接屬性檔案：

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. 將下面的命令新增到 **kafka-run-class.sh** 的 **Launch mode (啟動模式)** 區段下：

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. 將下面的命令新增到 **kafka-run-class.sh** 的 **Launch mode (啟動模式)** 區段下

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   它應該如下所示：

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. 如果使用 bash，執行下面的命令來在您的 bash\$1profile 中設定您的 CLASSPATH。對於任何其他 Shell，請相應地更新環境。

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (選用) 如果您想要使用簡單的檔案來源進行測試，請複製檔案來源連接器。

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. 在來源連接器組態下，將資料格式編輯為 Avro、檔案讀取器編輯為 `AvroFileReader` 並從您正在讀取的檔案路徑中更新範例 Avro 物件。例如：

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. 安裝來源連接器。

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. 更新 `<your Apache Kafka installation directory>/config/connect-file-sink.properties` 下的接收器屬性會更新主題名稱和輸出檔案名稱。

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. 啟動來源連接器 (在此範例中，它是檔案來源連接器)。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. 執行接收器連接器 (在此範例中，它是檔案接收器連接器)。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   如需範例 Kafka Connect 用法，請參閱 [AWS Glue 結構描述登錄檔的 Github 儲存庫](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests)中 integration-tests 資料夾下的 run-local-tests.sh 指令碼。

# 從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔
<a name="schema-registry-integrations-migration"></a>

從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔，對現有、目前的第三方結構描述登錄檔具有相依性。如果在使用第三方結構描述登錄檔傳送的 Apache Kafka 主題中有記錄，則消費者需要第三方結構描述登錄檔來還原序列化這些記錄。`AWSKafkaAvroDeserializer` 提供指定次要還原序列化程式類別的能力，此類別指向第三方還原序列化程式並用於還原序列化這些記錄。

淘汰第三方結構描述有兩個條件。首先，只有在使用第三方結構描述登錄檔的 Apache Kafka 主題中的記錄不再被任何消費者需要後，才會發生淘汰。其次，取決於針對這些主題指定的保留期間，Apache Kafka 主題的老化可能會導致淘汰。請注意，如果您有無限保留的主題，您仍然可以移轉至 AWS Glue 結構描述登錄檔，但您將無法淘汰第三方結構描述登錄檔。因應措施是，您可以使用應用程式或 Mirror Maker 2 讀取目前主題，並使用 AWS Glue 結構描述登錄檔產生新主題。

從第三方結構描述登錄檔移轉至 AWS Glue 結構描述登錄檔：

1. 建立 AWS Glue 結構描述登錄檔中的登錄檔，或使用預設登錄檔。

1. 停止消費者。修改它以包含 AWS Glue 結構描述登錄檔做為主要的還原序列化程式，並包含第三方結構描述登錄檔做為次要。
   + 設定消費者屬性。在此範例中，secondary\$1deserializer 設定為不同的還原序列化程式。行為如下：消費者從 Amazon MSK 擷取記錄，並首先嘗試使用 `AWSKafkaAvroDeserializer`。如果無法為 AWS Glue 結構描述登錄檔讀取包含 Avro 結構描述 ID 的魔術位元組結構描述，則 `AWSKafkaAvroDeserializer` 會嘗試使用 secondary\$1deserializer 中提供的還原序列化程式類別。次要還原序列化程式專屬的屬性也需要在消費者屬性中提供，例如 schema\$1registry\$1url\$1config 和 specific\$1avro\$1reader\$1config，如下所示。

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. 重新啟動消費者。

1. 停止生產者並將生產者指向 AWS Glue 結構描述登錄檔。

   1. 設定生產者屬性。在這個範例中，生產者將使用 default-registry 和自動註冊結構描述版本。

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (選用) 手動將現有的結構描述和結構描述版本從目前的第三方結構描述登錄檔移至 AWS Glue 結構描述登錄檔，無論是 AWS Glue 結構描述登錄檔中的預設登錄檔或 AWS Glue 結構描述登錄檔中的特定非預設登錄檔。這可以透過從第三方結構描述登錄檔匯出 JSON 格式的結構描述，並使用 AWS 管理主控台 或 在AWS Glue結構描述登錄檔中建立新的結構描述來完成 AWS CLI。

    如果您需要使用 AWS CLI 和 為新建立的結構描述版本啟用與先前結構描述版本的相容性檢查 AWS 管理主控台，或者生產者以開啟結構描述版本自動註冊的新結構描述傳送訊息時，此步驟可能很重要。

1. 啟動生產者。