

# Getting started with schema registry
<a name="schema-registry-gs"></a>

The following sections provide an overview and walk you through setting up and using Schema Registry. For information about schema registry concepts and components, see [AWS Glue Schema registry](schema-registry.md).

**Topics**
+ [Installing SerDe Libraries](schema-registry-gs-serde.md)
+ [Integrating with AWS Glue Schema Registry](schema-registry-integrations.md)
+ [Migration from a third-party schema registry to AWS Glue Schema Registry](schema-registry-integrations-migration.md)

# Installing SerDe Libraries
<a name="schema-registry-gs-serde"></a>

The SerDe libraries provide a framework for serializing and deserializing data. 

You will install the open source serializer for your applications producing data (collectively the "serializers"). The serializer handles serialization, compression, and the interaction with the Schema Registry. The serializer automatically extracts the schema from a record being written to a Schema Registry compatible destination, such as Amazon MSK. Likewise, you will install the open source deserializer on your applications consuming data.

# Java Implementation
<a name="schema-registry-gs-serde-java"></a>

**Note**  
Prerequisites: Before completing the following steps, you will need to have a Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Apache Kafka cluster running. Your producers and consumers need to be running on Java 8 or above.

To install the libraries on producers and consumers:

1. Inside both the producers’ and consumers’ pom.xml files, add this dependency via the code below:

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   Alternatively, you can clone the [AWS Glue Schema Registry Github repository](https://github.com/awslabs/aws-glue-schema-registry).

1. Setup your producers with these required properties:

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   If there are no existing schemas, then auto-registration needs to be turned on (next step). If you do have a schema that you would like to apply, then replace "my-schema" with your schema name. Also the "registry-name" has to be provided if schema auto-registration is off. If the schema is created under the "default-registry" then registry name can be omitted.

1. (Optional) Set any of these optional producer properties. For detailed property descriptions, see [the ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   Auto-registration registers the schema version under the default registry ("default-registry"). If a `SCHEMA_NAME` is not specified in the previous step, then the topic name is inferred as `SCHEMA_NAME`. 

   See [Schema versioning and compatibility](schema-registry.md#schema-registry-compatibility) for more information on compatibility modes.

1. Setup your consumers with these required properties:

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS Region
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (Optional) Set these optional consumer properties. For detailed property descriptions, see [the ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 Implementation
<a name="schema-registry-gs-serde-csharp"></a>

**Note**  
Prerequisites: Before completing the following steps, you will need to have a Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Apache Kafka cluster running. Your producers and consumers need to be running on .NET 8.0 or above.

## Installation
<a name="schema-registry-gs-serde-csharp-install"></a>

For C\$1 applications, install the AWS Glue Schema Registry SerDe NuGet package using one of the following methods:

**.NET CLI:**  
Use the following command to install the package:

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

where `<rid>` could be `1.0.0-linux-x64`, `1.0.0-linux-musl-x64` or `1.0.0-linux-arm64`

**PackageReference (in your .csproj file):**  
Add the following to your project file:

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

where `<rid>` could be `1.0.0-linux-x64`, `1.0.0-linux-musl-x64` or `1.0.0-linux-arm64`

## Configuration File Setup
<a name="schema-registry-gs-serde-csharp-config"></a>

Create a configuration properties file (e.g., `gsr-config.properties`) with the required settings:

**Minimal Configuration:**  
The following shows a minimal configuration example:

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Using C\$1 Glue Schema client library for Kafka SerDes
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**Sample serializer usage:**  
The following example shows how to use the serializer:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**Sample deserializer usage:**  
The following example shows how to use the deserializer:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## Using C\$1 Glue Schema client library with KafkaFlow for SerDes
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**Sample serializer usage:**  
The following example shows how to configure KafkaFlow with the serializer:

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**Sample deserializer usage:**  
The following example shows how to configure KafkaFlow with the deserializer:

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## Optional Producer Properties
<a name="schema-registry-gs-serde-csharp-optional"></a>

You can extend your configuration file with additional optional properties:

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## Supported Data Formats
<a name="schema-registry-gs-serde-supported-formats"></a>

Both Java and C\$1 implementations support the same data formats:
+ *AVRO*: Apache Avro binary format
+ *JSON*: JSON Schema format
+ *PROTOBUF*: Protocol Buffers format

## Notes
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ To get started with the library, please visit [https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ Source code is available at: [https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# Creating a registry
<a name="schema-registry-gs3"></a>

You may use the default registry or create as many new registries as necessary using the AWS Glue APIs or AWS Glue console.

**AWS Glue APIs**  
You can use these steps to perform this task using the AWS Glue APIs.

To use the AWS CLI for the AWS Glue Schema Registry APIs, make sure to update your AWS CLI to the latest version.

 To add a new registry, use the [CreateRegistry action (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) API. Specify `RegistryName` as the name of the registry to be created, with a max length of 255, containing only letters, numbers, hyphens, underscores, dollar signs, or hash marks. 

Specify a `Description` as a string not more than 2048 bytes long, matching the [ URI address multi-line string pattern](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns). 

Optionally, specify one or more `Tags` for your registry, as a map array of key-value pairs.

```
aws glue create-registry --registry-name registryName1 --description description
```

When your registry is created it is assigned an Amazon Resource Name (ARN), which you can view in the `RegistryArn` of the API response. Now that you've created a registry, create one or more schemas for that registry.

**AWS Glue console**  
To add a new registry in the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schema registries**.

1. Choose **Add registry**.

1. Enter a **Registry name** for the registry, consisting of letters, numbers, hyphens, or underscores. This name cannot be changed.

1. Enter a **Description** (optional) for the registry.

1. Optionally, apply one or more tags to your registry. Choose **Add new tag** and specify a **Tag key** and optionally a **Tag value**.

1. Choose **Add registry**.

![\[Example of a creating a registry.\]](http://docs.aws.amazon.com/glue/latest/dg/images/schema_reg_create_registry.png)


When your registry is created it is assigned an Amazon Resource Name (ARN), which you can view by choosing the registry from the list in **Schema registries**. Now that you've created a registry, create one or more schemas for that registry.

# Dealing with a specific record (JAVA POJO) for JSON
<a name="schema-registry-gs-json-java-pojo"></a>

You can use a plain old Java object (POJO) and pass the object as a record. This is similar to the notion of a specific record in AVRO. The [mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema) can generate a JSON schema for the POJO passed. This library can also inject additional information in the JSON schema.

The AWS Glue Schema Registry library uses the injected "className" field in schema to provide a fully classified class name. The "className" field is used by the deserializer to deserialize into an object of that class.

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# Creating a schema
<a name="schema-registry-gs4"></a>

You can create a schema using the AWS Glue APIs or the AWS Glue console. 

**AWS Glue APIs**  
You can use these steps to perform this task using the AWS Glue APIs.

To add a new schema, use the [CreateSchema action (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API.

Specify a `RegistryId` structure to indicate a registry for the schema. Or, omit the `RegistryId` to use the default registry.

Specify a `SchemaName` consisting of letters, numbers, hyphens, or underscores, and `DataFormat` as **AVRO** or **JSON**. `DataFormat` once set on a schema is not changeable.

Specify a `Compatibility` mode:
+ *Backward (recommended)* — Consumer can read both current and previous version.
+ *Backward all* — Consumer can read current and all previous versions.
+ *Forward* — Consumer can read both current and subsequent version.
+ *Forward all* — Consumer can read both current and all subsequent versions.
+ *Full* — Combination of Backward and Forward.
+ *Full all* — Combination of Backward all and Forward all.
+ *None* — No compatibility checks are performed.
+ *Disabled* — Prevent any versioning for this schema.

Optionally, specify `Tags` for your schema. 

Specify a `SchemaDefinition` to define the schema in Avro, JSON, or Protobuf data format. See the examples.

For Avro data format:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

For JSON data format:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

For Protobuf data format:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue console**  
To add a new schema using the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schemas**.

1. Choose **Add schema**.

1. Enter a **Schema name**, consisting of letters, numbers, hyphens, underscores, dollar signs, or hashmarks. This name cannot be changed.

1. Choose the **Registry** where the schema will be stored from the drop-down menu. The parent registry cannot be changed post-creation.

1. Leave the **Data format** as *Apache Avro* or *JSON*. This format applies to all versions of this schema.

1. Choose a **Compatibility mode**.
   + *Backward (recommended)* — receiver can read both current and previous versions.
   + *Backward All* — receiver can read current and all previous versions.
   + *Forward* — sender can write both current and previous versions.
   + *Forward All* — sender can write current and all previous versions.
   + *Full* — combination of Backward and Forward.
   + *Full All* — combination of Backward All and Forward All.
   + *None* — no compatibility checks performed.
   + *Disabled* — prevent any versioning for this schema.

1. Enter an optional **Description** for the registry of up to 250 characters.  
![\[Example of a creating a schema.\]](http://docs.aws.amazon.com/glue/latest/dg/images/schema_reg_create_schema.png)

1. Optionally, apply one or more tags to your schema. Choose **Add new tag** and specify a **Tag key** and optionally a **Tag value**.

1. In the **First schema version** box, enter or paste your initial schema. .

   For Avro format, see [Working with Avro data format](#schema-registry-avro)

   For JSON format, see [Working with JSON data format](#schema-registry-json)

1. Optionally, choose **Add metadata** to add version metadata to annotate or classify your schema version.

1. Choose **Create schema and version**.

![\[Example of a creating a schema.\]](http://docs.aws.amazon.com/glue/latest/dg/images/schema_reg_create_schema2.png)


The schema is created and appears in the list under **Schemas**.

## Working with Avro data format
<a name="schema-registry-avro"></a>

Avro provides data serialization and data exchange services. Avro stores the data definition in JSON format making it easy to read and interpret. The data itself is stored in binary format.

For information on defining an Apache Avro schema, see the [Apache Avro specification](http://avro.apache.org/docs/current/spec.html).

## Working with JSON data format
<a name="schema-registry-json"></a>

Data can be serialized with JSON format. [JSON Schema format](https://json-schema.org/) defines the standard for JSON Schema format.

# Updating a schema or registry
<a name="schema-registry-gs5"></a>

Once created you can edit your schemas, schema versions, or registry.

## Updating a registry
<a name="schema-registry-gs5a"></a>

You can update a registry using the AWS Glue APIs or the AWS Glue console. The name of an existing registry cannot be edited. You can edit the description for a registry.

**AWS Glue APIs**  
To update an existing registry, use the [UpdateRegistry action (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API.

Specify a `RegistryId` structure to indicate the registry that you want to update. Pass a `Description` to change the description for a registry.

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue console**  
To update a registry using the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schema registries**.

1. Choose a registry from the the list of registries, by checking its box.

1. In the **Action** menu, choose **Edit registry**.

# Updating a schema
<a name="schema-registry-gs5b"></a>

You can update the description or compatibility setting for a schema.

To update an existing schema, use the [UpdateSchema action (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API.

Specify a `SchemaId` structure to indicate the schema that you want to update. One of `VersionNumber` or `Compatibility` has to be provided.

Code example 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# Adding a schema version
<a name="schema-registry-gs5c"></a>

When you add a schema version, you will need to compare the versions to make sure the new schema will be accepted.

To add a new version to an existing schema, use the [RegisterSchemaVersion action (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API.

Specify a `SchemaId` structure to indicate the schema for which you want to add a version, and a `SchemaDefinition` to define the schema.

Code example 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schemas**.

1. Choose the schema from the the list of schemas, by checking its box.

1. Choose one or more schemas from the list, by checking the boxes.

1. In the **Action** menu, choose **Register new version**.

1. In the **New version** box, enter or paste your new schema.

1. Choose **Compare with previous version** to see differences with the previous schema version.

1. Optionally, choose **Add metadata** to add version metadata to annotate or classify your schema version. Enter **Key** and optional **Value**.

1. Choose **Register version**.

![\[Adding a schema version.\]](http://docs.aws.amazon.com/glue/latest/dg/images/schema_reg_add_schema_version.png)


The schema(s) version appears in the list of versions. If the version changed the compatibility mode, the version will be marked as a checkpoint.

## Example of a schema version comparison
<a name="schema-registry-gs5c1"></a>

When you choose to **Compare with previous version**, you will see the previous and new versions displayed together. Changed information will be highlighted as follows:
+ *Yellow*: indicates changed information.
+ *Green*: indicates content added in the latest version.
+ *Red*: indicates content removed in the latest version.

You can also compare against earlier versions.

![\[Example of a schema version comparison.\]](http://docs.aws.amazon.com/glue/latest/dg/images/schema_reg_version_comparison.png)


# Deleting a schema or registry
<a name="schema-registry-gs7"></a>

Deleting a schema, a schema version, or a registry are permanent actions that cannot be undone.

## Deleting a schema
<a name="schema-registry-gs7a"></a>

You may want to delete a schema when it will no longer be used within a registry, using the AWS Management Console, or the [DeleteSchema action (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API.

Deleting one or more schemas is a permanent action that cannot be undone. Make sure that the schema or schemas are no longer needed.

To delete a schema from the registry, call the [DeleteSchema action (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API, specifying the `SchemaId` structure to identify the schema.

For example:

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue console**  
To delete a schema from the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schema registries**.

1. Choose the registry that contains your schema from the the list of registries.

1. Choose one or more schemas from the list, by checking the boxes.

1. In the **Action** menu, choose **Delete schema**.

1. Enter the text **Delete** in the field to confirm deletion.

1. Choose **Delete**.

The schema(s) you specified are deleted from the registry.

## Deleting a schema version
<a name="schema-registry-gs7b"></a>

As schemas accumulate in the registry, you may want to delete unwanted schema versions using the AWS Management Console, or the [DeleteSchemaVersions action (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API. Deleting one or more schema versions is a permanent action that cannot be undone. Make sure that the schema versions are no longer needed.

When deleting schema versions, take note of the following constraints:
+ You cannot delete a check-pointed version.
+ The range of contiguous versions cannot be more than 25.
+ The latest schema version must not be in a pending state.

Specify the `SchemaId` structure to identify the schema, and specify `Versions` as a range of versions to delete. For more information on specifying a version or range of versions, see [DeleteRegistry action (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). The schema versions you specified are deleted from the registry.

Calling the [ListSchemaVersions action (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API after this call will list the status of the deleted versions.

For example:

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schema registries**.

1. Choose the registry that contains your schema from the the list of registries.

1. Choose one or more schemas from the list, by checking the boxes.

1. In the **Action** menu, choose **Delete schema**.

1. Enter the text **Delete** in the field to confirm deletion.

1. Choose **Delete**.

The schema versions you specified are deleted from the registry.

# Deleting a registry
<a name="schema-registry-gs7c"></a>

You may want to delete a registry when the schemas it contains should no longer be organized under that registry. You will need to reassign those schemas to another registry.

Deleting one or more registries is a permanent action that cannot be undone. Make sure that the registry or registries no longer needed.

The default registry can be deleted using the AWS CLI.

**AWS Glue API**  
To delete the entire registry including the schema and all of its versions, call the [DeleteRegistry action (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API. Specify a `RegistryId` structure to identify the registry.

For example:

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

To get the status of the delete operation, you can call the `GetRegistry` API after the asynchronous call.

**AWS Glue console**  
To delete a registry from the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Schema registries**.

1. Choose a registry from the list, by checking a box.

1. In the **Action** menu, choose **Delete registry**.

1. Enter the text **Delete** in the field to confirm deletion.

1. Choose **Delete**.

The registries you selected are deleted from AWS Glue.

## IAM examples for serializers
<a name="schema-registry-gs1"></a>

**Note**  
AWS managed policies grant necessary permissions for common use cases. For information on using managed policies to manage the schema registry, see [AWS managed (predefined) policies for AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed). 

For serializers, you should create a minimal policy similar to that below to give you the ability to find the `schemaVersionId` for a given schema definition. Note, you should have read permissions on the registry in order to read the schemas in the registry. You can limit the registries that can be read by using the `Resource` clause.

Code example 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

Further, you can also allow producers to create new schemas and versions by including the following extra methods. Note, you should be able to inspect the registry in order to add/remove/evolve the schemas inside it. You can limit the registries that can be inspected by using the `Resource` clause.

Code example 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## IAM examples for deserializers
<a name="schema-registry-gs1b"></a>

For deserializers (consumer side), you should create a policy similar to that below to allow the deserializer to fetch the schema from the Schema Registry for deserialization. Note, you should be able to inspect the registry in order to fetch the schemas inside it.

Code example 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## Private connectivity using AWS PrivateLink
<a name="schema-registry-gs-private"></a>

You can use AWS PrivateLink to connect your data producer’s VPC to AWS Glue by defining an interface VPC endpoint for AWS Glue. When you use a VPC interface endpoint, communication between your VPC and AWS Glue is conducted entirely within the AWS network. For more information, see [Using AWS Glue with VPC Endpoints](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html).

# Accessing Amazon CloudWatch metrics
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch metrics are available as part of CloudWatch’s free tier. You can access these metrics in the CloudWatch console. API-Level metrics include CreateSchema (Success and Latency), GetSchemaByDefinition, (Success and Latency), GetSchemaVersion (Success and Latency), RegisterSchemaVersion (Success and Latency), PutSchemaVersionMetadata (Success and Latency). Resource-level metrics include Registry.ThrottledByLimit, SchemaVersion.ThrottledByLimit, SchemaVersion.Size.

# Sample CloudFormation template for schema registry
<a name="schema-registry-integrations-cfn"></a>

The following is a sample template for creating Schema Registry resources in CloudFormation. To create this stack in your account, copy the above template into a file `SampleTemplate.yaml`, and run the following command:

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

This example uses `AWS::Glue::Registry` to create a registry, `AWS::Glue::Schema` to create a schema, `AWS::Glue::SchemaVersion` to create a schema version, and `AWS::Glue::SchemaVersionMetadata` to populate schema version metadata. 

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# Integrating with AWS Glue Schema Registry
<a name="schema-registry-integrations"></a>

These sections describe integrations with AWS Glue schema registry. The examples in these section show a schema with AVRO data format. For more examples, including schemas with JSON data format, see the integration tests and ReadMe information in the [AWS Glue Schema Registry open source repository](https://github.com/awslabs/aws-glue-schema-registry).

**Topics**
+ [Use case: Connecting Schema Registry to Amazon MSK or Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [Use case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry](#schema-registry-integrations-kds)
+ [Use case: Amazon Managed Service for Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [Use Case: Integration with AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [Use case: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [Use case: AWS Glue streaming](#schema-registry-integrations-aws-glue-streaming)
+ [Use case: Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## Use case: Connecting Schema Registry to Amazon MSK or Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

Let's assume you are writing data to an Apache Kafka topic, and you can follow these steps to get started.

1. Create an Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Apache Kafka cluster with at least one topic. If creating an Amazon MSK cluster, you can use the AWS Management Console. Follow these instructions: [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

1. Follow the [Installing SerDe Libraries](schema-registry-gs-serde.md) step above.

1. To create schema registries, schemas, or schema versions, follow the instructions under the [Getting started with schema registry](schema-registry-gs.md) section of this document.

1. Start your producers and consumers to use the Schema Registry to write and read records to/from the Amazon MSK or Apache Kafka topic. Example producer and consumer code can be found in [the ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) from the Serde libraries. The Schema Registry library on the producer will automatically serialize the record and decorate the record with a schema version ID.

1. If the schema of this record has been inputted, or if auto-registration is turned on, then the schema will have been registered in the Schema Registry.

1. The consumer reading from the Amazon MSK or Apache Kafka topic, using the AWS Glue Schema Registry library, will automatically lookup the schema from the Schema Registry.

## Use case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry
<a name="schema-registry-integrations-kds"></a>

This integration requires that you have an existing Amazon Kinesis data stream. For more information, see [Getting Started with Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html) in the *Amazon Kinesis Data Streams Developer Guide*.

There are two ways that you can interact with data in a Kinesis data stream.
+ Through the Kinesis Producer Library (KPL) and Kinesis Client Library (KCL) libraries in Java. Multi-language support is not provided.
+ Through the `PutRecords`, `PutRecord`, and `GetRecords` Kinesis Data Streams APIs available in the AWS SDK for Java.

If you currently use the KPL/KCL libraries, we recommend continuing to use that method. There are updated KCL and KPL versions with Schema Registry integrated, as shown in the examples. Otherwise, you can use the sample code to leverage the AWS Glue Schema Registry if using the KDS APIs directly.

Schema Registry integration is only available with KPL v0.14.2 or later and with KCL v2.3 or later. Schema Registry integration with JSON data format is available with KPL v0.14.8 or later and with KCL v2.3.6 or later.

### Interacting with Data Using Kinesis SDK V2
<a name="schema-registry-integrations-kds-sdk-v2"></a>

This section describes interacting with Kinesis using Kinesis SDK V2

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### Interacting with data using the KPL/KCL libraries
<a name="schema-registry-integrations-kds-libraries"></a>

This section describes integrating Kinesis Data Streams with Schema Registry using the KPL/KCL libraries. For more information on using KPL/KCL, see [Developing Producers Using the Amazon Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) in the *Amazon Kinesis Data Streams Developer Guide*.

#### Setting up the Schema Registry in KPL
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. Define the schema definition for the data, data format and schema name authored in the AWS Glue Schema Registry.

1. Optionally configure the `GlueSchemaRegistryConfiguration` object.

1. Pass the schema object to the `addUserRecord API`.

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Setting up the Kinesis client library
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

You will develop your Kinesis Client Library consumer in Java. For more information, see [Developing a Kinesis Client Library Consumer in Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) in the *Amazon Kinesis Data Streams Developer Guide*.

1. Create an instance of `GlueSchemaRegistryDeserializer` by passing a `GlueSchemaRegistryConfiguration` object.

1. Pass the `GlueSchemaRegistryDeserializer` to `retrievalConfig.glueSchemaRegistryDeserializer`.

1. Access the schema of incoming messages by calling `kinesisClientRecord.getSchema()`.

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Interacting with data using the Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis"></a>

This section describes integrating Kinesis Data Streams with Schema Registry using the Kinesis Data Streams APIs.

1. Update these Maven dependencies:

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. In the producer, add schema header information using the `PutRecords` or `PutRecord` API in Kinesis Data Streams.

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. In the producer, use the `PutRecords` or `PutRecord` API to put the record into the data stream.

1. In the consumer, remove the schema record from the header, and serialize an Avro schema record.

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Interacting with data using the Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis-reference"></a>

The following is example code for using the `PutRecords` and `GetRecords` APIs.

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Use case: Amazon Managed Service for Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink is a popular open source framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Amazon Managed Service for Apache Flink is a fully managed AWS service that enables you to build and manage Apache Flink applications to process streaming data.

Open source Apache Flink provides a number of sources and sinks. For example, predefined data sources include reading from files, directories, and sockets, and ingesting data from collections and iterators. Apache Flink DataStream Connectors provide code for Apache Flink to interface with various third-party systems, such as Apache Kafka or Kinesis as sources and/or sinks.

For more information, see [Amazon Kinesis Data Analytics Developer Guide](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html).

### Apache Flink Kafka connector
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink provides an Apache Kafka data stream connector for reading data from and writing data to Kafka topics with exactly-once guarantees. Flink's Kafka consumer, `FlinkKafkaConsumer`, provides access to read from one or more Kafka topics. Apache Flink’s Kafka Producer, `FlinkKafkaProducer`, allows writing a stream of records to one or more Kafka topics. For more information, see [Apache Kafka Connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html).

### Apache Flink Kinesis streams Connector
<a name="schema-registry-integrations-kinesis-connector"></a>

The Kinesis data stream connector provides access to Amazon Kinesis Data Streams. The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple Kinesis streams within the same AWS service region, and can transparently handle re-sharding of streams while the job is running. Each subtask of the consumer is responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will change as shards are closed and created by Kinesis. The `FlinkKinesisProducer` uses Kinesis Producer Library (KPL) to put data from an Apache Flink stream into a Kinesis stream. For more information, see [Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html).

For more information, see the [AWS Glue Schema Github repository](https://github.com/awslabs/aws-glue-schema-registry).

### Integrating with Apache Flink
<a name="schema-registry-integrations-apache-flink-integrate"></a>

The SerDes library provided with Schema Registry integrates with Apache Flink. To work with Apache Flink, you are required to implement [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) and [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) interfaces called `GlueSchemaRegistryAvroSerializationSchema` and `GlueSchemaRegistryAvroDeserializationSchema`, which you can plug into Apache Flink connectors.

### Adding an AWS Glue Schema Registry dependency into the Apache Flink application
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

To set up the integration dependencies to AWS Glue Schema Registry in the Apache Flink application:

1. Add the dependency to the `pom.xml` file.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Integrating Kafka or Amazon MSK with Apache Flink
<a name="schema-registry-integrations-kda-integrate-msk"></a>

You can use Managed Service for Apache Flink for Apache Flink, with Kafka as a source or Kafka as a sink.

**Kafka as a source**  
The following diagram shows integrating Kinesis Data Streams with Managed Service for Apache Flink for Apache Flink, with Kafka as a source.

![\[Kafka as a source.\]](http://docs.aws.amazon.com/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka as a sink**  
The following diagram shows integrating Kinesis Data Streams with Managed Service for Apache Flink for Apache Flink, with Kafka as a sink.

![\[Kafka as a sink.\]](http://docs.aws.amazon.com/glue/latest/dg/images/gsr-kafka-sink.png)


To integrate Kafka (or Amazon MSK) with Managed Service for Apache Flink for Apache Flink, with Kafka as a source or Kafka as a sink, make the code changes below. Add the bolded code blocks to your respective code in the analogous sections.

If Kafka is the source, then use the deserializer code (block 2). If Kafka is the sink, use the serializer code (block 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Integrating Kinesis Data Streams with Apache Flink
<a name="schema-registry-integrations-integrate-kds"></a>

You can use Managed Service for Apache Flink for Apache Flink with Kinesis Data Streams as a source or a sink.

**Kinesis Data Streams as a source**  
The following diagram shows integrating Kinesis Data Streams with Managed Service for Apache Flink for Apache Flink, with Kinesis Data Streams as a source.

![\[Kinesis Data Streams as a source.\]](http://docs.aws.amazon.com/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams as a sink**  
The following diagram shows integrating Kinesis Data Streams with Managed Service for Apache Flink for Apache Flink, with Kinesis Data Streams as a sink.

![\[Kinesis Data Streams as a sink.\]](http://docs.aws.amazon.com/glue/latest/dg/images/gsr-kinesis-sink.png)


To integrate Kinesis Data Streams with Managed Service for Apache Flink for Apache Flink, with Kinesis Data Streams as a source or Kinesis Data Streams as a sink, make the code changes below. Add the bolded code blocks to your respective code in the analogous sections.

If Kinesis Data Streams is the source, use the deserializer code (block 2). If Kinesis Data Streams is the sink, use the serializer code (block 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## Use Case: Integration with AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

To use an AWS Lambdafunction as an Apache Kafka/Amazon MSK consumer and deserialize Avro-encoded messages using AWS Glue Schema Registry, visit the [MSK Labs page](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html).

## Use case: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue tables support schemas that you can specify manually or by reference to the AWS Glue Schema Registry. The Schema Registry integrates with the Data Catalog to allow you to optionally use schemas stored in the Schema Registry when creating or updating AWS Glue tables or partitions in the Data Catalog. To identify a schema definition in the Schema Registry, at a minimum, you need to know the ARN of the schema it is part of. A schema version of a schema, which contains a schema definition, can be referenced by its UUID or version number. There is always one schema version, the "latest" version, that can be looked up without knowing its version number or UUID.

When calling the `CreateTable` or `UpdateTable` operations, you will pass a `TableInput` structure that contains a `StorageDescriptor`, which may have a `SchemaReference` to an existing schema in the Schema Registry. Similarly, when you call the `GetTable` or `GetPartition` APIs, the response may contain the schema and the `SchemaReference`. When a table or partition was created using a schema references, the Data Catalog will try to fetch the schema for this schema reference. In case it is unable to find the schema in the Schema Registry, it returns an empty schema in the `GetTable` response; otherwise the response will have both the schema and schema reference.

You can also perform the actions from the AWS Glue console.

To perform these operations and create, update, or view the schema information, you must give an IAM role to the calling user that provides permissions for the `GetSchemaVersion` API.

### Adding a table or updating the schema for a table
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

Adding a new table from an existing schema binds the table to a specific schema version. Once new schema versions get registered, you can update this table definition from the View table page in the AWS Glue console or using the [UpdateTable action (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API.

#### Adding a table from an existing schema
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

You can create an AWS Glue table from a schema version in the registry using the AWS Glue console or `CreateTable` API.

**AWS Glue API**  
When calling the `CreateTable` API, you will pass a `TableInput` that contains a `StorageDescriptor` which has a `SchemaReference` to an existing schema in the Schema Registry.

**AWS Glue console**  
To create a table from the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Tables**.

1. In the **Add Tables** menu, choose **Add table from existing schema**.

1. Configure the table properties and data store per the AWS Glue Developer Guide.

1. In the **Choose a Glue schema** page, select the **Registry** where the schema resides.

1. Choose the **Schema name** and select the **Version** of the schema to apply.

1. Review the schema preview, and choose **Next**.

1. Review and create the table.

The schema and version applied to the table appears in the **Glue schema** column in the list of tables. You can view the table to see more details.

#### Updating the schema for a table
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

When a new schema version becomes available, you may want to update a table's schema using the [UpdateTable action (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API or the AWS Glue console. 

**Important**  
When updating the schema for an existing table that has an AWS Glue schema specified manually, the new schema referenced in the Schema Registry may be incompatible. This can result in your jobs failing.

**AWS Glue API**  
When calling the `UpdateTable` API, you will pass a `TableInput` that contains a `StorageDescriptor` which has a `SchemaReference` to an existing schema in the Schema Registry.

**AWS Glue console**  
To update the schema for a table from the AWS Glue console:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. In the navigation pane, under **Data catalog**, choose **Tables**.

1. View the table from the list of tables.

1. Click **Update schema** in the box that informs you about a new version.

1. Review the differences between the current and new schema.

1. Choose **Show all schema differences** to see more details.

1. Choose **Save table** to accept the new version.

## Use case: AWS Glue streaming
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue streaming consumes data from streaming sources and perform ETL operations before writing to an output sink. Input streaming source can be specified using a Data Table or directly by specifying the source configuration.

AWS Glue streaming supports a Data Catalog table for the streaming source created with the schema present in the AWS Glue Schema Registry. You can create a schema in the AWS Glue Schema Registry and create an AWS Glue table with a streaming source using this schema. This AWS Glue table can be used as an input to an AWS Glue streaming job for deserializing data in the input stream.

One point to note here is when the schema in the AWS Glue Schema Registry changes, you need to restart the AWS Glue streaming job needs to reflect the changes in the schema.

## Use case: Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

The Apache Kafka Streams API is a client library for processing and analyzing data stored in Apache Kafka. This section describes the integration of Apache Kafka Streams with AWS Glue Schema Registry, which allows you to manage and enforce schemas on your data streaming applications. For more information on Apache Kafka Streams, see [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).

### Integrating with the SerDes Libraries
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

There is a `GlueSchemaRegistryKafkaStreamsSerde` class that you can configure a Streams application with.

#### Kafka Streams application example code
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

To use the AWS Glue Schema Registry within an Apache Kafka Streams application:

1. Configure the Kafka Streams application.

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. Create a stream from the topic avro-input.

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. Process the data records (the example filters out those records whose value of favorite\$1color is pink or where the value of amount is 15).

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. Write the results back to the topic avro-output.

   ```
   result.to("avro-output");
   ```

1. Start the Apache Kafka Streams application.

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### Implementation results
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

These results show the filtering process of records that were filtered out in step 3 as a favorite\$1color of "pink" or value of "15.0".

Records before filtering:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

Records after filtering:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### Use case: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

The integration of Apache Kafka Connect with the AWS Glue Schema Registry enables you to get schema information from connectors. The Apache Kafka converters specify the format of data within Apache Kafka and how to translate it into Apache Kafka Connect data. Every Apache Kafka Connect user will need to configure these converters based on the format they want their data in when loaded from or stored into Apache Kafka. In this way, you can define your own converters to translate Apache Kafka Connect data into the type used in the AWS Glue Schema Registry (for example: Avro) and utilize our serializer to register its schema and do serialization. Then converters are also able to use our deserializer to deserialize data received from Apache Kafka and convert it back into Apache Kafka Connect data. An example workflow diagram is given below.

![\[Apache Kafka Connect workflow.\]](http://docs.aws.amazon.com/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. Install the `aws-glue-schema-registry` project by cloning the [Github repository for the AWS Glue Schema Registry](https://github.com/awslabs/aws-glue-schema-registry).

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. If you plan on using Apache Kafka Connect in *Standalone* mode, update **connect-standalone.properties** using the instructions below for this step. If you plan on using Apache Kafka Connect in *Distributed* mode, update **connect-avro-distributed.properties** using the same instructions.

   1. Add these properties also to the Apache Kafka connect properties file:

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. Add the command below to the **Launch mode** section under **kafka-run-class.sh**:

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. Add the command below to the **Launch mode** section under **kafka-run-class.sh**

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   It should look like this:

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. If using bash, run the below commands to set-up your CLASSPATH in your bash\$1profile. For any other shell, update the environment accordingly.

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (Optional) If you want to test with a simple file source, then clone the file source connector.

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. Under the source connector configuration, edit the data format to Avro, file reader to `AvroFileReader` and update an example Avro object from the file path you are reading from. For example:

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. Install the source connector.

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. Update the sink properties under `<your Apache Kafka installation directory>/config/connect-file-sink.properties` update the topic name and out file name.

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Start the Source Connector (in this example it is a file source connector).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Run the Sink Connector (in this example it is a file sink connector).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   For an example Kafka Connect usage, look at the run-local-tests.sh script under integration-tests folder in the [Github repository for the AWS Glue Schema Registry](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests).

# Migration from a third-party schema registry to AWS Glue Schema Registry
<a name="schema-registry-integrations-migration"></a>

The migration from a third-party schema registry to the AWS Glue Schema Registry has a dependency on the existing, current third-party schema registry. If there are records in an Apache Kafka topic which were sent using a third-party schema registry, consumers need the third-party schema registry to deserialize those records. The `AWSKafkaAvroDeserializer` provides the ability to specify a secondary deserializer class which points to the third-party deserializer and is used to deserialize those records.

There are two criteria for retirement of a third-party schema. First, retirement can occur only after records in Apache Kafka topics using the 3rd party schema registry are either no longer required by and for any consumers. Second, retirement can occur by aging out of the Apache Kafka topics, depending on the retention period specified for those topics. Note that if you have topics which have infinite retention, you can still migrate to the AWS Glue Schema Registry but you will not be able to retire the third-party schema registry. As a workaround, you can use an application or Mirror Maker 2 to read from the current topic and produce to a new topic with the AWS Glue Schema Registry.

To migrate from a third-party schema registry to the AWS Glue Schema Registry:

1. Create a registry in the AWS Glue Schema Registry, or use the default registry.

1. Stop the consumer. Modify it to include AWS Glue Schema Registry as the primary deserializer, and the third-party schema registry as the secondary. 
   + Set the consumer properties. In this example, the secondary\$1deserializer is set to a different deserializer. The behavior is as follows: the consumer retrieves records from Amazon MSK and first tries to use the `AWSKafkaAvroDeserializer`. If it is unable to read the magic byte that contains the Avro Schema ID for the AWS Glue Schema Registry schema, the `AWSKafkaAvroDeserializer` then tries to use the deserializer class provided in the secondary\$1deserializer. The properties specific to the secondary deserializer also need to be provided in the consumer properties, such as the schema\$1registry\$1url\$1config and specific\$1avro\$1reader\$1config, as shown below.

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. Restart the consumer.

1. Stop the producer and point the producer to the AWS Glue Schema Registry.

   1. Set the producer properties. In this example, the producer will use the default-registry and auto register schema versions.

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (Optional) Manually move existing schemas and schema versions from the current third-party schema registry to the AWS Glue Schema Registry, either to the default-registry in AWS Glue Schema Registry or to a specific non-default registry in AWS Glue Schema Registry. This can be done by exporting schemas from the third-party schema registries in JSON format and creating new schemas in AWS Glue Schema Registry using the AWS Management Console or the AWS CLI.

    This step may be important if you need to enable compatibility checks with previous schema versions for newly created schema versions using the AWS CLI and the AWS Management Console, or when producers send messages with a new schema with auto-registration of schema versions turned on.

1. Start the producer.