

# スキーマレジストリの開始方法
<a name="schema-registry-gs"></a>

以下のセクションでは、Schema Registry のセットアップと使用に関する概要とチュートリアルを示します。スキーマレジストリの概念および各コンポーネントの詳細については、「[AWS Glue スキーマレジストリ](schema-registry.md)」を参照してください。

**Topics**
+ [SerDe ライブラリのインストール](schema-registry-gs-serde.md)
+ [AWS Glue Schema Registry との統合](schema-registry-integrations.md)
+ [サードパーティー製のスキーマレジストリから AWS Glue Schema Registry への移行](schema-registry-integrations-migration.md)

# SerDe ライブラリのインストール
<a name="schema-registry-gs-serde"></a>

SerDe ライブラリは、データのシリアル化と非シリアル化のためのフレームワークを提供します。

データを生成するアプリケーション (総称してシリアライザ) 用の、オープンソースのシリアライザをインストールします。シリアライザは、シリアル化、圧縮、および Schema Registry とのやり取りを処理します。シリアライザは、Schema Registry 対応の送信先 (Amazon MSK など) に書き込まれるレコードから、スキーマを自動的に抽出します。同様に、データを利用するアプリケーションには、オープンソースのデシリアライザをインストールします。

# Java 実装
<a name="schema-registry-gs-serde-java"></a>

**注記**  
前提条件: 次のステップを実行する前に、Amazon Managed Streaming for Apache Kafka (Amazon MSK) または Apache Kafka クラスターを起動しておく必要があります。使用するプロデューサーとコンシューマーは、Java 8 以上で実行する必要があります。

プロデューサとコンシューマにライブラリをインストールするには

1. プロデューサとコンシューマ両方の pom.xml ファイルの中で、以下のコードにより依存関係を追加します。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   または、[AWS Glue Schema Registry GitHub リポジトリ](https://github.com/awslabs/aws-glue-schema-registry)からクローンを作成することもできます。

1. 次の必須プロパティを使用してプロデューサをセットアップします。

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   既存のスキーマがない場合は、自動登録を有効にします (次のステップ)。適用できるスキーマがある場合は、「my-schema」をそのスキーマ名に置き換えます。また、スキーマの自動登録が無効になっている場合は、「registry-name」を指定する必要もあります。スキーマが「default-registry」の下に作成されている場合は、レジストリ名を省略できます。

1. (オプション) これらのオプションのプロデューサープロパティのいずれかを設定します。プロパティの詳細については、[ReadMe ファイル](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) を参照してください。

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   自動登録では、スキーマのバージョンがデフォルトのレジストリ (default-registry) に登録されます。`SCHEMA_NAME` が前のステップで指定されていない場合、トピック名は `SCHEMA_NAME` として推定されます。

   互換モードの詳細については、[スキーマのバージョニングと互換性](schema-registry.md#schema-registry-compatibility) を参照してください。

1. 以下の必須プロパティを使用してコンシューマを設定します。

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS リージョン
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (オプション) これらのオプションのコンシューマプロパティを設定します。プロパティの詳細については、[ReadMe ファイル](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) を参照してください。

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 実装
<a name="schema-registry-gs-serde-csharp"></a>

**注記**  
前提条件: 次のステップを実行する前に、Amazon Managed Streaming for Apache Kafka (Amazon MSK) または Apache Kafka クラスターを起動しておく必要があります。使用するプロデューサーとコンシューマーは、.NET 8.0 以上で実行する必要があります。

## インストール
<a name="schema-registry-gs-serde-csharp-install"></a>

C\$1 アプリケーションの場合は、次のいずれかの方法を使用して AWS Glue Schema Registry SerDe NuGet パッケージをインストールします。

**.NET CLI:**  
パッケージをインストールするには、次のコマンドを使用します。

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

ここで、`<rid>` は `1.0.0-linux-x64`、`1.0.0-linux-musl-x64` または `1.0.0-linux-arm64` です。

**PackageReference (.csproj ファイル内):**  
プロジェクトファイルに次の内容を追加します。

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

ここで、`<rid>` は `1.0.0-linux-x64`、`1.0.0-linux-musl-x64` または `1.0.0-linux-arm64` です。

## 設定ファイルのセットアップ
<a name="schema-registry-gs-serde-csharp-config"></a>

必要な設定で設定プロパティファイル (`gsr-config.properties` など) を作成します。

**最小限の設定:**  
以下に最小限の設定例を示します。

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Kafka SerDes 用の C\$1 Glue スキーマクライアントライブラリの使用
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**シリアライザーの使用例:**  
次の例は、シリアライザーの使用方法を示しています。

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**デシリアライザーの使用例:**  
次の例は、デシリアライザーの使用方法を示しています。

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## SerDes 用の KafkaFlow での C\$1 Glue スキーマクライアントライブラリの使用
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**シリアライザーの使用例:**  
次の例は、シリアライザーを使用して KafkaFlow を設定する方法を示しています。

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**デシリアライザーの使用例:**  
次の例は、デシリアライザーを使用して KafkaFlow を設定する方法を示しています。

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## オプションのプロデューサープロパティ
<a name="schema-registry-gs-serde-csharp-optional"></a>

追加のオプションプロパティを使用して、設定ファイルを拡張できます。

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## サポートされている日付書式
<a name="schema-registry-gs-serde-supported-formats"></a>

Java と C\$1 の両方の実装は、同じデータ形式をサポートしています。
+ *AVRO*: Apache Avro バイナリ形式
+ *JSON*: JSON スキーマ形式
+ *PROTOBUF*: プロトコルバッファ形式

## 注意事項
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ ライブラリの使用を開始するには、[https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry) にアクセスしてください
+ ソースコードは、[https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry) で入手できます

# レジストリを作成する
<a name="schema-registry-gs3"></a>

AWS Glue API または AWS Glue コンソールを使用して、デフォルトのレジストリを使用することも、必要な数の新しいレジストリを作成することもできます。

**AWS Glue API**  
ここでの手順により、AWS Glue API を使用しながら対象のタスクを実行できます。

AWS Glue Schema Registry API で AWS CLI を使用するには、最新バージョンの AWS CLI 使用する必要が有ります。

 新しいレジストリを追加するには、[CreateRegistry アクション (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) APIを使用します。`RegistryName` では、作成するレジストリの名前を指定します。この文字数は最大 255 までで、文字、数字、ハイフン、アンダースコア、ドル記号、およびハッシュ記号のみ使用できます。

2,048 バイト以下で「[URI アドレスの複数行の文字列パターン](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns)」に一致する文字列として `Description` を指定します。

オプションで、キーと値のペアのマップ配列として、1 つ以上の `Tags` をレジストリに指定します。

```
aws glue create-registry --registry-name registryName1 --description description
```

作成されたレジストリには、Amazon リソースネーム (ARN) が割り当てられます。これは、`RegistryArn` API 応答により表示することが可能です。この段階でレジストリ作成が完了しているので、そのレジストリのために 1 つ以上のスキーマを作成します。

**AWS Glue コンソール**  
AWS Glue コンソールで新しいレジストリを追加するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. [**Add registry**] (レジストリを追加) をクリックします。

1. [**Registry name**] (レジストリ名) に、文字、数字、ハイフン、アンダースコアを含むレジストリの名前を入力します。この名前は変更できません。

1. レジストリの [**Description**] (説明) を入力します　(オプション)。

1. オプションで、1 つ以上のタグをレジストリに適用します。[**Add new tag**] (新しいタグを追加) を選択し、[**Tag key**] (タグキー) とオプションの [**Tag value**] (タグ値) を指定します。

1. [**Add registry**] (レジストリを追加) をクリックします。

![\[レジストリの作成例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_create_registry.png)


作成されたレジストリには、Amazon リソースネーム (ARN) が割り当てられます。これは、[**Schema registries**] (スキーマレジストリ) のリストから選択して表示することが可能です。この段階でレジストリ作成が完了しているので、そのレジストリのために 1 つ以上のスキーマを作成します。

# JSON の特定のレコード (JAVA POJO) 処理する
<a name="schema-registry-gs-json-java-pojo"></a>

従来の単純な Java オブジェクト (POJO) を使用して、オブジェクトをレコードとして渡すことができます。これは、AVRO における特定のレコードの概念と類似しています。[mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema) により、渡された POJO の JSON スキーマを生成できます。また、このライブラリでは、JSON スキーマに追加情報を挿入することもできます。

AWS Glue Schema Registry ライブラリは、スキーマに注入された「className」フィールドを使用して、完全に分類されたクラス名を提供します。「className」フィールドは、そのクラスのオブジェクト内での非シリアル化のために、デシリアライザによって使用されます。

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# スキーマの作成
<a name="schema-registry-gs4"></a>

スキーマは、AWS Glue API または AWS Glue コンソールを使用して作成できます。

**AWS Glue API**  
ここでの手順により、AWS Glue API を使用しながら対象のタスクを実行できます。

新しいスキーマを追加するには [CreateSchema アクション (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API を使用します。

`RegistryId` 構造体を使用して、スキーマのレジストリを指定します。または、`RegistryId` を省略すると、デフォルトのレジストリが使用されます。

文字、数字、ハイフン、アンダースコアを使用し、`DataFormat` として **AVRO** または **JSON** を設定しながら `SchemaName` を指定します。一度スキーマに設定された `DataFormat` は変更できません。

`Compatibility` モードを指定する。
+ *Backward (推奨)* – コンシューマは、現在のバージョンと 1 つ前のバージョンの両方を読み取ることができます。
+ *Backward all* – コンシューマは、現在のバージョンと過去のすべてのバージョンを読み取ることができます。
+ *Forward* – コンシューマは、現在のバージョンと次に続くバージョンの両方を読み取ることができます。
+ *Forward all* – コンシューマは、現在のバージョンと後続のすべてのバージョンの両方を読み取ることができます。
+ *Full* – Backward all と Forward all を組み合わせたモードです。
+ *Full all* – Backward all と Forward all を組み合わせたモードです。
+ *None* – 互換性チェックは実行されません。
+ *Disabled* – このスキーマのバージョニングを抑止します。

オプションで、スキーマに `Tags` を指定します。

`SchemaDefinition` を指定することで、スキーマのデータ形式を Avro、JSON、もしくは Protobuf として定義します。例を参照してください。

Avro データ形式の場合:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

JSON データ形式の場合:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Protobuf データ形式の場合:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue コンソール**  
AWS Glue コンソールを使用して新しいスキーマを追加するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schemas**] (スキーマ) をクリックします。

1. [**Add schema**] (スキーマを追加) をクリックします。

1. [**Schema name**] (スキーマ名) に、文字、数字、ハイフン、アンダースコア、ドル記号、ハッシュマークで構成された名前を入力します。この名前は変更できません。

1. [**Registry**] (レジストリ) ドロップダウンメニューから、スキーマの保存先となるレジストリを選択します。作成後は、親レジストリを変更することはできません。

1. [**Data format**] (データ形式) は、「*Apache Avro*」または「*JSON*」のままにしておきます。この形式は、このスキーマのすべてのバージョンに適用されます。

1. [**Compatibility mode**] (互換モード) をクリックします。
   + *Backward (推奨)* – レシーバーは、現在のバージョンと 1 つ前のバージョンの両方を読み取ることができます。
   + *Backward All* – レシーバーは、現在および過去のすべてのバージョンを読み取ることができます。
   + *Forward* – センダーは、現在のバージョンと 1 つ前のバージョンの両方に書き込むことができます。
   + *Forward All* – センダーは、現在のバージョンと過去のすべてのバージョンに書き込むことができます。
   + *Full* – Backward と Forward を組み合わせたモードです。
   + *Full All* – Backward All と Forward All を組み合わせたモードです。
   + *None* – 互換性チェックは実行されません。
   + *Disabled* – このスキーマのバージョニングを抑止します。

1. [**Description**] (説明) に、レジストリのための説明 (オプション) を、最大 250 文字で入力します。  
![\[スキーマの作成例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_create_schema.png)

1. オプションで、1 つ以上のタグをスキーマに適用します。[**Add new tag**] (新しいタグを追加) を選択し、[**Tag key**] (タグキー) とオプションの [**Tag value**] (タグ値) を指定します。

1. [**First schema バージョニング**] (最初のスキーマバージョン) ボックスに、初期スキーマを入力するか貼り付けます。

   Avro 形式については「[Avro データ形式での作業](#schema-registry-avro)」を参照

   JSON 形式については「[JSON データ形式での操作](#schema-registry-json)」を参照

1. 必要に応じて、[**Add metadata**] (メタデータを追加) をクリックして、スキーマバージョンの注釈付けや分類を行うためのバージョンメタデータを追加します。

1. [**Create schema and version**] (スキーマとバージョンを作成する) をクリックします。

![\[スキーマの作成例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_create_schema2.png)


スキーマが作成され、[**Schemas**] (スキーマ) の下に一覧表示されます。

## Avro データ形式での作業
<a name="schema-registry-avro"></a>

Avro では、データのシリアル化とデータ交換サービスが利用できます。Avro は、可読性と解釈しやすさのために、データ定義が JSON 形式で格納されます。データ自体はバイナリ形式で保存されます。

Apache Avro スキーマの定義については、「[Apache Avro specification](http://avro.apache.org/docs/current/spec.html)」を参照してください。

## JSON データ形式での操作
<a name="schema-registry-json"></a>

JSON 形式では、データをシリアル化できます。JSON スキーマ形式の標準は、「[JSON Schema format](https://json-schema.org/)」で定義されています。

# スキーマまたはレジストリの更新
<a name="schema-registry-gs5"></a>

作成したスキーマ、スキーマバージョン、またはレジストリは、編集することができます。

## レジストリの更新
<a name="schema-registry-gs5a"></a>

レジストリは、AWS Glue API または AWS Glue コンソールを使用して更新できます。既存のレジストリの名前は変更できません。レジストリの説明は編集が可能です。

**AWS Glue API**  
既存のレジストリを更新するには、[UpdateRegistry アクション (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API を使用します。

`RegistryId` 構造体を使用して、更新するレジストリを指定します。レジストリの説明を変更するには、`Description` を渡します。

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue コンソール**  
AWS Glue コンソールを使用してレジストリを更新するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. レジストリの一覧から、そのチェックボックスをオンにして、レジストリを選択します。

1. [**Action**] (アクション) メニューで、[**Edit registry**] (レジストリの編集) を選択します。

# スキーマの更新
<a name="schema-registry-gs5b"></a>

スキーマでは、説明または互換性設定の更新が行えます。

既存のスキーマを更新するには、[UpdateSchema アクション (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API を使用します。

`SchemaId` 構造体を使用して、更新するスキーマを指定します。`VersionNumber` または `Compatibility` のいずれかを指定する必要があります。

コード例 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# スキーマバージョンの追加。
<a name="schema-registry-gs5c"></a>

スキーマバージョンを追加する際は、そのバージョンを比較して、新しいスキーマが受け入れられることを確認する必要があります。

既存のスキーマに新しいバージョンを追加するには、[RegisterSchemaVersion アクション (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API を使用します。

`SchemaId` 構造体を使用して、バージョンを追加するスキーマを指定し、`SchemaDefinition` によりスキーマを定義します。

コード例 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schemas**] (スキーマ) をクリックします。

1. チェックボックスをオンにして、スキーマのリストからスキーマを選択します。

1. チェックボックスをクリックして、リストから 1 つ以上のスキーマを選択します。

1. [**Action**] (アクションメニュー) から、[**Register new version**] (新しいバージョンを登録) を選択します。

1. [**New version**] (新しいバージョン) ボックスに、新しいスキーマを入力または貼り付けます。

1. **[Compare with previous version]** (過去のバージョンと比較) をクリックして、以前のスキーマのバージョンとの違いを確認します。

1. 必要に応じて、[**Add metadata**] (メタデータを追加) をクリックして、スキーマバージョンの注釈付けや分類を行うためのバージョンメタデータを追加します。[**Key**] (キー) および (オプションの) [**Value**] (値) を入力します。

1. [**Register version**] (バージョンの登録) をクリックします。

![\[スキーマバージョンの追加。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_add_schema_version.png)


バージョンの一覧の中に、スキーマのバージョンが表示されます。バージョンで互換モードが変更された場合、そのバージョンはチェックポイントとしてマークされます。

## スキーマのバージョン比較の例。
<a name="schema-registry-gs5c1"></a>

[**Compare with previous version**] (過去のバージョンと比較) をクリックすると、以前のバージョンと新しいバージョンが一緒に表示されます。変更された情報は、次のように強調表示されます。
+ *黄色*: 変更された情報を示します。
+ *緑*: 最新バージョンで追加されたコンテンツを示します。
+ *赤*: 最新バージョンで削除されたコンテンツを示します。

より古いバージョンと比較することも可能です。

![\[スキーマのバージョン比較の例。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_version_comparison.png)


# スキーマまたはレジストリの削除
<a name="schema-registry-gs7"></a>

スキーマ、スキーマバージョン、またはレジストリの削除は永続的な操作であり、元に戻すことはできません。

## スキーマの削除
<a name="schema-registry-gs7a"></a>

レジストリ内で使用する必要がなくなったスキーマは、AWS マネジメントコンソール または [DeleteSchema アクション (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API を使用して削除することができます。。

1 つ以上のスキーマを削除することは永続的なアクションであり、元に戻すことはできません。(1 つあるいは複数の) スキーマが不要になったことを確認します。

レジストリからスキーマを削除するには、`SchemaId` 構造体により対象のスキーマを特定しながら [DeleteSchema アクション (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API を呼び出します。

例: 

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue コンソール**  
AWS Glue コンソールからスキーマを削除するには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. レジストリのリストから、自分のスキーマを含むレジストリを選択します。

1. チェックボックスをクリックして、リストから 1 つ以上のスキーマを選択します。

1. [**Action**] (アクション) メニューで、[**Delete schema**] (スキーマの削除) をクリックします。

1. フィールドに「**Delete**」というテキストを入力して、削除を確定します。

1. **[削除]** を選択します。

指定した (1 つ以上の) スキーマがレジストリから削除されます。

## スキーマバージョンの削除
<a name="schema-registry-gs7b"></a>

スキーマはレジストリに蓄積されるので、不要なスキーマバージョンは、AWS マネジメントコンソール または [DeleteSchemaVersions アクション (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API を使用して削除できます。1 つ以上のスキーマバージョンを削除することは永続的なアクションであり、元に戻すことはできません。そのスキーマバージョンが不要であることを確認します。

スキーマのバージョンを削除する場合は、以下の制約に注意してください。
+ チェックポイントとなっているバージョンを削除することはできません。
+ 25 を超えて連続するバージョンの範囲を削除することはできません。
+ 最新のスキーマバージョンが保留状態にある場合は、削除は行えません。

`SchemaId` 構造体を使用してスキーマを指定し、削除するバージョンの範囲を `Versions` で指定します。バージョンまたはバージョンの範囲の指定の詳細については、「[DeleteRegistry アクション (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry)」を参照してください。指定したスキーマバージョンがレジストリから削除されます。

この呼び出しの後に [ListSchemaVersions アクション (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API を呼び出すと、削除されたバージョンのステータスが一覧表示されます。

例: 

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. レジストリのリストから、自分のスキーマを含むレジストリを選択します。

1. チェックボックスをクリックして、リストから 1 つ以上のスキーマを選択します。

1. [**Action**] (アクション) メニューで、[**Delete schema**] (スキーマの削除) をクリックします。

1. フィールドに「**Delete**」というテキストを入力して、削除を確定します。

1. **[削除]** を選択します。

指定したスキーマバージョンがレジストリから削除されます。

# レジストリの削除
<a name="schema-registry-gs7c"></a>

レジストリに含まれるスキーマの整理の必要性がなくなった場合は、そのレジストリを削除することができます。これらのスキーマは、別のレジストリに再割り当てする必要があります。

1 つ以上のレジストリを削除することは永続的なアクションであり、元に戻すことはできません。(1 つもしくは複数の) レジストリが不要であることを確認します。

デフォルトのレジストリは、AWS CLI を使用して削除できます。

**AWS Glue API**  
レジストリ全体を、登録されたスキーマとそのすべてのバージョンとともに削除するには、[DeleteRegistry アクション (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API を呼び出します。`RegistryId` 構造体を使用し、レジストリを特定します。

例: 

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

削除オペレーションのステータスを取得するには、非同期呼び出し後に `GetRegistry` API を呼び出します。

**AWS Glue コンソール**  
AWS Glue コンソールからレジストリの削除を行うには

1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Schema registries**] (スキーマレジストリ) をクリックします。

1. チェックボックスをオンにして、リストからレジストリを選択します。

1. [**Action**] (アクション) メニューで、[**Delete registry**] (レジストリの削除) を選択します。

1. フィールドに「**Delete**」というテキストを入力して、削除を確定します。

1. **[削除]** を選択します。

選択したレジストリが AWS Glue から削除されます。

## シリアライザ用の IAM の例
<a name="schema-registry-gs1"></a>

**注記**  
AWS 管理ポリシーは、一般的ユースケースに必要なアクセス許可を付与します。管理ポリシーを使用してスキーマのレジストリを管理する方法については、「[AWS Glue の AWS マネージド (事前定義) ポリシー](security-iam-awsmanpol.md#access-policy-examples-aws-managed)」を参照してください。

シリアライザの場合、以下と同様の最小限のポリシーを作成して、特定のスキーマ定義のために `schemaVersionId` を検索できるようにします。レジストリ内のスキーマを読み取るには、そのレジストリに対する読み取り許可が必要であることに注意してください。読み取り可能なレジストリは、`Resource` 句を使用して制限できます。

コード例 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

さらに、以下の新しいメソッドを追加して、プロデューサに対し新しいスキーマとバージョンの作成を許可することもできます。レジストリ内でスキーマを追加/削除/拡大させるためには、そのレジストリを調査できることが必要です。調査することが可能なレジストリは、`Resource` 句を使用して制限できます。

コード例 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## デシリアライザー用の IAM の例
<a name="schema-registry-gs1b"></a>

デシリアライザ (コンシューマ側) の場合、以下のようなポリシーを作成する必要があります。これにより、非シリアル化のために Schema Registry からスキーマを取得することを、デシリアライザに対し許可します。レジストリ内のスキーマを取得するためには、そのレジストリを調査することが許可されている必要があります。

コード例 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## AWS PrivateLink を使用したプライベート接続
<a name="schema-registry-gs-private"></a>

AWS Glue でインターフェイス VPC エンドポイントを定義しながら AWS PrivateLinkを使用すると、データのプロデユーサの VPC を AWS Glue に接続することができます。VPC インターフェイスエンドポイントにより、AWS ネットワーク内全体で VPC と AWS Glue 間の通信を処理します。詳細については、「[Using AWS Glue with VPC Endpoints](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html)」を参照してください。

# Amazon CloudWatch メトリクスへのアクセス
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch メトリクスは、CloudWatch の無料利用枠の一部として利用できます。これらのメトリクスは CloudWatch コンソールでアクセスできます。APIレベルのメトリクスとしては、CreateSchema (Success および Latency)、GetSchemaByDefinition (Success および Latency)、GetSchemaVersion (Success および Latency)、RegisterSchemaVersion (Success および Latency)、PutSchemaVersionMetadata (Success および Latency) があります。リソースレベルのメトリクスには、Registry.ThrottledByLimit、SchemaVersion.ThrottledByLimit、SchemaVersion.Size があります。

# スキーマレジストリの CloudFormation テンプレート例
<a name="schema-registry-integrations-cfn"></a>

以下に、CloudFormation で Schema Registry リソースを作成するためのテンプレート例を示します。アカウントにこのスタックを作成するには、上記のテンプレートを `SampleTemplate.yaml` ファイルにコピーした上で、次のコマンドを実行します。

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

この例では、レジストリを作成するために `AWS::Glue::Registry` を、スキーマを作成するために `AWS::Glue::Schema` を、スキーマバージョンを作成するために `AWS::Glue::SchemaVersion` を使用し、`AWS::Glue::SchemaVersionMetadata` によりスキーマバージョンのメタデータを記述しています。

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# AWS Glue Schema Registry との統合
<a name="schema-registry-integrations"></a>

これらのセクションでは、AWS Glue スキーマレジストリとの統合について説明します。このセクションの例では、AVRO データ形式のスキーマを使用します。JSON データ形式のスキーマなど、その他の例については、「[AWS Glue Schema Registry open source repository](https://github.com/awslabs/aws-glue-schema-registry)」で、統合テストと ReadMe に関する情報をご覧ください。

**Topics**
+ [ユースケース: Schema Registry を Amazon MSK または Apache Kafka に接続する](#schema-registry-integrations-amazon-msk)
+ [ユースケース: Amazon Kinesis Data Streams と AWS Glue Schema Registry との統合](#schema-registry-integrations-kds)
+ [Amazon Managed Service for Apache Flink のユースケース](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [ユースケース: AWS Lambda との統合](#schema-registry-integrations-aws-lambda)
+ [ユースケース: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [ユースケース: AWS Glue ストリーミング](#schema-registry-integrations-aws-glue-streaming)
+ [ユースケース: Apache Kafka ストリーム](#schema-registry-integrations-apache-kafka-streams)

## ユースケース: Schema Registry を Amazon MSK または Apache Kafka に接続する
<a name="schema-registry-integrations-amazon-msk"></a>

Apache Kafka トピックにデータを書き込む場合には、以下の手順に従い作業を開始します。

1. Amazon Managed Streaming for Apache Kafka(Amazon MSK) または Apache Kafka のクラスターを作成し、少なくとも 1 つのトピックを含めます。Amazon MSK クラスターを作成する場合は、AWS マネジメントコンソール を使用します。「*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*」の「[Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)」にある手順に従います。

1. 上記の [SerDe ライブラリのインストール](schema-registry-gs-serde.md) ステップを実行します。

1. スキーマのレジストリ、スキーマ、またはスキーマバージョンを作成するには、このドキュメントにある [スキーマレジストリの開始方法](schema-registry-gs.md) セクションの手順に従います。

1. Amazon MSK または Apache Kafka のトピックとの間で、レコードの書き込みや読み取りを行うために、Schema Registry を使用してプロデューサとコンシューマを起動します。プロデューサとコンシューマのコード例は、Serdeライブラリの [ReadMe ファイル](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)から入手できます。プロデューサの Schema Registry ライブラリは、レコードを自動的にシリアル化し、スキーマバージョン ID でそのレコードを修飾します。

1. このレコードにスキーマが入力済みの場合、または自動登録が有効になっている場合には、スキーマが Schema Registry に登録されます。

1. AWS Glue Schema Registry ライブラリを使用して、Amazon MSK または Apache Kafka のトピックからスキーマの読み取りを行うコンシューマは、自動的に Schema Registry からスキーマを検索します。

## ユースケース: Amazon Kinesis Data Streams と AWS Glue Schema Registry との統合
<a name="schema-registry-integrations-kds"></a>

この統合には、既存の Amazon Kinesis データストリーム が必要です。詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Getting Started with Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)」を参照してください。

Kinesis データストリームでは、データの操作用に以下の 2 つの方法があります。
+ Java の Kinesis Producer Library (KPL) および Kinesis Client Library (KCL) ライブラリを使用します。多言語サポートは提供されていません。
+ AWS SDK for Java に用意されている `PutRecords`、`PutRecord`、および `GetRecords` Kinesis Data Streams API を使用します。

現在、KPL/KCL ライブラリを使用中であれば、そのメソッドを引き続き使用することをお勧めします。ここでの例に示すように、Schema Registry が統合済みの、更新された KCL および KPL バージョンを使用できます。それ以外で、KDS API を直接使用している場合には、サンプルコードを通じて AWS Glue Schema Registry を利用します。

Schema Registry との統合は、KPL v0.14.2 以降と KCL v2.3 以降でのみ使用できます。JSON データ形式による Schema Registry との統合は、KPL v0.14.8 以降および KCL v2.3.6 以降で使用できます。

### Kinesis SDK V2 を使用したデータの操作
<a name="schema-registry-integrations-kds-sdk-v2"></a>

このセクションでは、Kinesis SDK V2 による Kinesis の操作について説明します。

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### KPL/KCL ライブラリを使用したデータの操作
<a name="schema-registry-integrations-kds-libraries"></a>

このセクションでは、KPL/KCL ライブラリを使用しての Kinesis Data Streams と Schema Registry の統合について説明します。KPL/KCL の使用方法については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Developing Producers Using the Amazon Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)」を参照してください。

#### KPL で Schema Registry を設定する
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. AWS Glue Schema Registryで作成したデータ、データ形式、スキーマ名のスキーマ定義を行います。

1. 必要に応じて、`GlueSchemaRegistryConfiguration` オブジェクトも構成します。

1. `addUserRecord API` にスキーマオブジェクトを渡します。

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Kinesis Client Library のセットアップ
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Kinesis Client Library コンシューマーを、Java により構築します。詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Developing a Kinesis Client Library Consumer in Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html)」を参照してください。

1. `GlueSchemaRegistryConfiguration` オブジェクトを渡すことで `GlueSchemaRegistryDeserializer` インスタンスを作成します。

1. `GlueSchemaRegistryDeserializer` を `retrievalConfig.glueSchemaRegistryDeserializer` に渡します。

1. `kinesisClientRecord.getSchema()` を呼び出して、受信メッセージのスキーマにアクセスします。

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Kinesis Data Streams API を使用したデータの操作
<a name="schema-registry-integrations-kds-apis"></a>

このセクションでは、Kinesis Data Streams API を使用しての、Kinesis Data Streams と Schema Registry の統合について説明します。

1. Maven の以下の依存関係を更新します。

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. `PutRecords` または Kinesis Data Streams の `PutRecord` API を使用しながら、プロデューサ内にスキーマヘッダー情報を追加します。

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. プロデューサ内で `PutRecords` または `PutRecord` API を使用して、レコードをデータストリームに配置します。

1. コンシューマ内で、ヘッダーからスキーマレコードを削除し、Avro スキーマレコードをシリアル化します。

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Kinesis Data Streams API を使用したデータの操作
<a name="schema-registry-integrations-kds-apis-reference"></a>

以下に、`PutRecords` および `GetRecords` API を使用するコード例を示します。

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Amazon Managed Service for Apache Flink のユースケース
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flinkは、無制限および制限付きのデータストリームに対するステートフルな計算に広く使用されている、オープンソースフレームワークの分散処理エンジンです。Amazon Managed Service for Apache Flink は、ストリーミングデータを処理するため、Apache Flink アプリケーションを構築して管理できるようにする完全マネージド型の AWS サービスです。

オープンソースの Apache Flink では、多数のソースとシンクを利用できます。例えば、事前定義済みのデータソースには、ファイル、ディレクトリ、およびソケットからの読み込みや、コレクションとイテレータからのデータの取り込みなどが含まれています。Apache Flink DataStream Connector は、Apache Flinkが、Apache Kafka や Kinesis などの各種サードパーティー製システムと、ソースおよび/またはシンクとしてインターフェースするためのコードを提供します。

詳細については、「[Amazon Kinesis Data Analytics デベロッパーガイド](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)」を参照してください。

### Apache Flink Kafka Connector
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flinkは、Kafka のトピックに対するデータの読み取りおよび書き込みを、正確に一度で行えるようにするための、Apache Kafka データストリームのコネクタを提供します。Flink の Kafka コンシューマ `FlinkKafkaConsumer` では、1 つ以上の Kafka トピックから読み取りを行うアクセスが提供されます。Apache Flink の Kafka プロデューサ `FlinkKafkaProducer` では、1 つ以上の Kafka トピックに対しレコードのストリームを書き込むことができます。詳細については、「[Apache Kafka Connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html)」を参照してください。

### Apache Flink Kinesis Streams Connector
<a name="schema-registry-integrations-kinesis-connector"></a>

Kinesis データストリームのコネクタは、Amazon Kinesis Data Streams へのアクセスを提供します。並列ストリーミングデータソース `FlinkKinesisConsumer` は、同じAWS のサービスリージョン内で複数の Kinesis ストリームにサブスクライブされ、ジョブの実行中にストリームの再シャーディングを透過的に (確実に 1 回で) 処理できます。コンシューマーの各サブタスクが、複数の Kinesis シャードからのデータレコードの取得を受け持ちます。各サブタスクによって取得されるシャードの数は、Kinesis によってシャードが閉じられ、また作成されるたびに変化します。`FlinkKinesisProducer` は Kinesis Producer Library (KPL) を使用して、Kinesis ストリーム内に Apache Flink ストリームからのデータを配置します。詳細については、「[Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html)」を参照してください。

詳細については、[AWS Glue のGitHub リポジトリ](https://github.com/awslabs/aws-glue-schema-registry)を参照してください。

### Apache Flink との統合
<a name="schema-registry-integrations-apache-flink-integrate"></a>

Schema Registry で提供されている SerDes ライブラリは、Apache Flink と統合されています。Apache Flink を使用するには、[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) および [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) のインターフェイス (`GlueSchemaRegistryAvroSerializationSchema` および `GlueSchemaRegistryAvroDeserializationSchema`) を実装する必要があります。これらは、Apache Flink コネクタにプラグインして使用します。

### Apache Flink アプリケーションへの AWS Glue Schema Registry の依存関係の追加
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Apache Flink アプリケーション内で AWS Glue Schema Registry との統合の依存関係をセットアップするには

1. `pom.xml` ファイルに依存関係を追加します。

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Kafka または Amazon MSK を Apache Flink と統合する
<a name="schema-registry-integrations-kda-integrate-msk"></a>

Kafka をソースまたはシンクとしながら、Apache Flink 対応の Managed Service for Apache Flink を使用できます。

**Kafka をソースとする場合**  
次の図は、Kafka をソースとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kafka をソースとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka をシンクとする場合**  
次の図は、Kafka をシンクとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kafka をシンクとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kafka-sink.png)


Kafka をソースまたはシンクとしながら、Kafka (または Amazon MSK) を Apache Flink 対応の Managed Service for Apache Flink と統合するには、以下のコード変更を行います。太字で示されたコードブロックを、類似するセクション内の対応するコードにそれぞれ追加します。

Kafka をソースとする場合は、デシリアライザ用コード (ブロック 2) を使用します。Kafka をシンクとする場合は、シリアライザコード (ブロック 3) を使用します。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Kinesis Data Streams と Apache Flink との統合
<a name="schema-registry-integrations-integrate-kds"></a>

Kinesis Data Streams をソースまたはシンクとしながら、Apache Flink 対応の Managed Service for Apache Flink を使用できます。

**Kinesis Data Streams をソースとする場合**  
次の図は、Kinesis Data Streams をソースとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kinesis Data Streams をソースとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams をシンクとする場合**  
次の図は、Kinesis Data Streams をシンクとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合した様子です。

![\[Kinesis Data Streams をシンクとする場合。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/gsr-kinesis-sink.png)


Kinesis Data Streams をソースまたはシンクとしながら、Kinesis Data Streams と Apache Flink 対応の Managed Service for Apache Flink を統合するには、以下のコード変更を行います。太字で示されたコードブロックを、類似するセクション内の対応するコードにそれぞれ追加します。

Kinesis Data Streams をソースとする場合は、デシリアライザコード (ブロック 2) を使用します。Kinesis Data Streams をシンクとする場合は、シリアライザコード (ブロック 3) を使用します。

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## ユースケース: AWS Lambda との統合
<a name="schema-registry-integrations-aws-lambda"></a>

AWS Lambda 関数を Apache Kafka/Amazon MSK のコンシューマーとして使用し、Avro でエンコードされたメッセージを AWS Glue Schema Registry により非シリアル化するには、[MSK ラボのページ](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)をご覧ください。

## ユースケース: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue テーブルは、手動による指定、または AWS Glue Schema Registry への参照によって指定できる、スキーマをサポートしています。AWS Glue テーブルまたは Data Catalog のパーティションを作成または更新する際に、オプションで Schema Registry に格納されているスキーマを使用できるように、Schema Registry には Data Catalog が統合されています。Schema Registry のスキーマ定義を特定するには、少なくとも、対象となるスキーマの ARN を知る必要があります。スキーマ定義を含むスキーマのスキーマバージョンは、UUID またはバージョン番号により参照が可能です。スキーマバージョンの中でも「最新」バージョンについては、バージョン番号または UUID を把握しなくても常に参照することができます。

`CreateTable` または `UpdateTable` オペレーションの呼び出し時は、Schema Registry 内の既存のスキーマに対する `TableInput` を指定するために `SchemaReference` 構造体 (`StorageDescriptor` を含む) を渡します。同様に、`GetTable` または `GetPartition` API を呼び出す場合は、そのレスポンスにスキーマと `SchemaReference` が含まれます。スキーマ参照を使用してテーブルまたはパーティションが作成されると、Data Catalog はこのスキーマ参照のスキーマ取得を試みます。Schema Registry 内にスキーマが見つからない場合は、`GetTable` レスポンスで空のスキーマを返します。それ以外では、このレスポンスにスキーマとスキーマ参照の両方が出力されます。

また、AWS Glue コンソールからアクションを実行することも可能です。

これらのオペレーションを実行し、スキーマ情報を作成、更新、表示するには、呼び出しユーザーに、`GetSchemaVersion` API へのアクセス権限を付与する、IAM ロールを付与する必要があります。

### テーブルの追加またはテーブルのスキーマの更新
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

既存のスキーマから新しいテーブルを追加すると、そのテーブルは特定のスキーマバージョンにバインドされます。新しいスキーマバージョンの登録が完了すると、このテーブル定義が、AWS Glue コンソールの [View table] (テーブルの表示) ページ、もしくは [UpdateTable アクション (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API を使用して更新できるようになります。

#### 既存のスキーマからのテーブルの追加
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

AWS Glue コンソールまたは `CreateTable` API を使用して、レジストリ内のスキーマバージョンから AWS Glue を作成できます。

**AWS Glue API**  
`CreateTable` API を呼び出す際に、(`StorageDescriptor` に`SchemaReference` が指定されている) `TableInput` を、スキーマレジストリの既存のスキーマに追加します。

**AWS Glue コンソール**  
AWS Glue コンソールを使用してテーブルを作成するには

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Tables**] (テーブル) をクリックします。

1. [**Add Tables**] (テーブルの追加) メニューで、[**Add table from existing schema**] (既存のスキーマからテーブルを追加する) をクリックします。

1. テーブルのプロパティとデータストアを、AWS Glue デベロッパーガイドに沿って設定します。

1. [**Choose a Glue schema**] (Glue スキーマの選択) ページで、スキーマが置かれている [**Registry**] (レジストリ) を選択します。

1. [**Schema name**] (スキーマ名) をクリックし、適用するスキーマの [**Version**] (バージョン) を選択します。

1. スキーマのプレビューを確認し、[**Next**] (次へ) をクリックします。

1. テーブルを確認し、作成します。

作成したテーブルに適用されたスキーマとバージョンは、テーブルの一覧内で [**Glue schema**] (Glue スキーマ) 列に表示されます。テーブルを表示すると、さらに詳細を確認できます。

#### テーブルのスキーマの更新
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

新しいスキーマバージョンが使用可能になったら、テーブルのスキーマを [UpdateTable アクション (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API または AWS Glue コンソールにより更新することができます 

**重要**  
手動で指定された AWS Glue スキーマを含む既存のテーブル用にスキーマを更新する場合、Schema Registry で参照される新しいスキーマは互換性を持たない可能性があります。この場合、ジョブが失敗することがあります。

**AWS Glue API**  
`UpdateTable` API を呼び出す際に、(`StorageDescriptor` に`SchemaReference` が指定されている) `TableInput` を、スキーマレジストリの既存のスキーマに追加します。

**AWS Glue コンソール**  
AWS Glue コンソールからテーブルのスキーマを更新するには

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)) を開きます。

1. ナビゲーションペインの [**Data catalog**] (データカタログ) で、[**Tables**] (テーブル) をクリックします。

1. テーブルの一覧でテーブルを表示します。

1. 新しいバージョンの情報が表示されたボックスで、[**Update schema**] (スキーマの更新) をクリックします。

1. 現在のスキーマと更新後のスキーマの違いを確認します。

1. さらに詳細を表示するには、[**Show all schema differences**] (スキーマの違いをすべて表示) をクリックします。

1. [**Save table**] (テーブルを保存) をクリックし、新しいバージョンを受け入れます。

## ユースケース: AWS Glue ストリーミング
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue ストリーミングは、ストリーミングソースからのデータを消費し、出力シンクに書き込む前に ETL オペレーションを実行します。入力ストリーミングソースは、データテーブルを使用して指定するか、ソース構成を指定して直接指定することができます。

AWS Glue ストリーミングは、AWS Glue スキーマレジストリに存在するスキーマで作成されたストリーミングソースのデータカタログテーブルをサポートします。AWS Glue スキーマレジストリにスキーマを作成し、そのスキーマを使用してストリーミングソースで AWS Glue テーブルを作成できます。この AWS Glue テーブルは、 AWS Glue ストリーミングジョブへの入力として使用し、入力ストリーミングのデータを逆シリアル化することができます。

注意すべき点は、AWS Glue スキーマレジスト内のスキーマが変化した場合、AWS Glue ストリーミングジョブを再度開始して、スキーマの変更を反映させる必要があることです。

## ユースケース: Apache Kafka ストリーム
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Apache Kafka Streams APIは、Apache Kafka に格納されているデータを処理・分析するためのクライアントライブラリです。このセクションでは、Apache Kafka Streams と AWS Glue Schema Registry の統合について説明します。これにより、データストリーミングアプリケーションのスキーマを、管理および適用できるようになります。Apache Kafka Streams の詳細については、「[Apache Kafka Streams](https://kafka.apache.org/documentation/streams/)」を参照してください。

### SerDes ライブラリとの統合
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

`GlueSchemaRegistryKafkaStreamsSerde` クラスにより、Streams のアプリケーションを設定できます。

#### Kafka Streams アプリケーションのコード例
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

Apache Kafka Streams アプリケーション内で AWS Glue Schema Registry を使用するには

1. Kafka Streams アプリケーションを設定します。

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. トピック avro-input からストリームを作成します。

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. データレコードを処理します (favorite\$1color の値がピンクであるか、値が 15 となっているレコードの除外など)。

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. トピック avro-output に結果を書き込みます。

   ```
   result.to("avro-output");
   ```

1. Apache Kafka Streams アプリケーションを起動します。

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### 実装結果
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

以下の結果は、ステップ 3 において (favorite\$1color が「pink」であるか値が「15.0」であるために) 除外されたレコードに関するフィルタリング処理を示しています。

フィルタリング前のレコード:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

フィルタリング後のレコード:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### ユースケース: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

Apache Kafka Connect と AWS Glue Schema Registry を統合することで、コネクタからスキーマ情報を取得できるようになります。Apache Kafka のコンバータにより、Apache Kafka 内のデータ形式と、Apache Kafka Connect データへの変換方法を指定します。すべての Apache Kafka Connect ユーザーは、これらのコンバータを Apache Kafka との間でロードまたは保存する際に、データに適用する形式に基づいた設定を行う必要があります。これにより、Apache Kafka Connect データを AWS Glue Schema Registry で使用する型 (例: Avro) に変換する独自のコンバータを定義し、さらにシリアライザを使用してスキーマを登録しシリアル化を実行します。その後コンバータはデシリアライザを使用して、Apache Kafka から受信したデータを逆シリアル化し、元の Apache Kafka Connect データに変換することができます。ワークフローの例を以下の図に示します。

![\[Apache Kafka Connect ワークフロー。\]](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. [AWS Glue Schema Registry 用 Githubリポジトリ](https://github.com/awslabs/aws-glue-schema-registry)をクローンして、`aws-glue-schema-registry` プロジェクトをインストールします。

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. Apache Kafka Connect を *Standalone* モードで使用する予定の場合、このステップで以下に示した手順を使用して、**connect-standalone.properties** を更新します。Apache Kafka Connect を *Distributed* モードで使用する予定の場合は、同じ手順により **connect-avro-distributed.properties** を更新します。

   1. Apache Kafka の接続プロパティファイルにも、これらのプロパティを追加します。

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. 以下のコマンドを、[**kafka-run-class.sh**] の下の [**Launch mode**] (起動モード) セクションに追加します。

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. [**kafka-run-class.sh**] の下の [**Launch mode**] (起動モード) セクションに以下のコマンドを追加する

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   プリンシパルは以下のようになります。

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. bash を使用している場合は、以下のコマンドを実行して、bash\$1profile で CLASSPATH を設定します。他のシェルの場合は、それに応じて環境を更新します。

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (オプション) 単純なファイルをソースとして使用しテストを行う場合は、ファイルソースコネクタのクローンを作成します。

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. ソースコネクタの設定で、データ形式を Avro に、ファイルリーダーを `AvroFileReader` に変更します。さらに、読み込んでいるファイルパスからサンプルの Avro オブジェクトを更新します。例: 

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. ソースコネクタをインストールします。

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. `<your Apache Kafka installation directory>/config/connect-file-sink.properties` のシンクのプロパティを更新し、トピック名と出力ファイル名を更新します。

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Source Connector (この例では、ソースファイルのコネクタ) を起動します。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Sink Connector (この例では、シンクファイルのコネクタ) を実行します。

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Kafka Connect の使用例については、[AWS Glue Schema Registry 用 Githubリポジトリ](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests)の、integration-tests フォルダにある run-local-tests.sh スクリプトでご確認ください。

# サードパーティー製のスキーマレジストリから AWS Glue Schema Registry への移行
<a name="schema-registry-integrations-migration"></a>

サードパーティー製スキーマレジストリから AWS Glue Schema Registry への移行作業は、その時点での既存のサードパーティー製スキーマレジストリにより異なります。サードパーティーのスキーマレジストリを使用して送信された Apache Kafka トピック内にレコードがある場合、コンシューマはサードパーティーのスキーマレジストリを使用して、それらのレコードを非シリアル化する必要があります。`AWSKafkaAvroDeserializer` には、セカンダリのデシリアライザクラスを定義する機能があります。このクラスは、サードパーティーのデシリアライザを特定し、対象のレコードを非シリアル化するために使用できます、

サードパーティー製スキーマには、使用停止に関して 2 つの基準があります。1 つ目の基準では、サードパーティー製スキーマレジストリを使用する Apache Kafka トピックのレコードを必要とするコンシューマが、なくなった後にのみ使用停止となります。2 つ目の基準では、トピックに指定された保持期間に応じ、Apache Kafka トピックの試用期間が終了することで使用停止となります。無制限の保持期間を持つトピックの場合は、AWS Glue Schema Registry への移行は可能ですが、サードパーティー製スキーマレジストリを使用停止にすることはできません。この回避策としては、アプリケーションまたは Mirror Maker 2 により現在のトピックを読み取り、AWS Glue Schema Registry を使用する新しいトピックとして作成し直すことができます。

サードパーティー製スキーマレジストリから AWS Glue Schema Registry へ移行するには

1. AWS Glue Schema Registry にレジストリを作成するか、デフォルトのレジストリを使用します。

1. コンシューマを停止します。AWS Glue Schema Registry をプライマリのデシリアライザとして含めるように、コンシューマを変更します、サードパーティーのスキーマレジストリをセカンダリに変更します。
   + コンシューマのプロパティを設定します。この例では、secondary\$1decerializer は別のデシリアライザに設定されています。動作は次のとおりです。コンシューマは Amazon MSK からレコードを取得し、最初に `AWSKafkaAvroDeserializer` の使用を試みます。AWS Glue Schema Registry のスキーマの、Avro スキーマ ID を含むマジックバイトを読み取ることができない場合、`AWSKafkaAvroDeserializer`は、secondary\$1deserializer で指定されるデシリアライザクラスの使用を試みます。セカンダリのデシリアライザに固有のプロパティは、以下に示すように、schema\$1registry\$1url\$1config や specific\$1avro\$1reader\$1config などのコンシューマプロパティでも指定する必要があります。

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. コンシューマを再起動します。

1. プロデューサを停止し、そのプロデューサで AWS Glue Schema Registry を指定します。

   1. プロデューサのプロパティを設定します。この例のプロデューサでは、デフォルトのレジストリと自動登録スキーマバージョンを使用しています。

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (オプション) 既存のスキーマおよびスキーマバージョンを、現在のサードパーティー製スキーマレジストリから AWS Glue Schema Registry (AWS Glue Schema Registry のデフォルトレジストリ、または AWS Glue Schema Registry 内の特定の非デフォルトレジストリ) に手動で移行します。これには、サードパーティーのスキーマレジストリから JSON 形式でスキーマをエクスポートし、AWS マネジメントコンソール または AWS CLI を使用しながら AWS Glue Schema Registry の新しいスキーマを作成します。

    このステップは、AWS CLI および AWS マネジメントコンソール を使用して、新しく作成されたスキーマバージョンについて、以前のスキーマバージョンとの互換性チェックを有効にする必要がある場合、またはスキーマバージョンの自動登録を有効にした新しいスキーマで、プロデユーサがメッセージを送信する場合に重要です。

1. プロデューサを起動します。