

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Erste Schritte mit der Schemaregistrierung
<a name="schema-registry-gs"></a>

Die folgenden Abschnitte vermitteln einen Überblick und führen Sie durch das Einrichten und Verwenden der Schema Registry. Informationen zu den Konzepten und Komponenten der Schemaregistrierung finden Sie unter [AWS Glue-Schemaregistrierung](schema-registry.md).

**Topics**
+ [SerDe Bibliotheken installieren](schema-registry-gs-serde.md)
+ [Integration mit AWS Glue Schema Registry](schema-registry-integrations.md)
+ [Migration von einer Drittanbieter-Schemaregistrierung zu AWS Glue Schema Registry](schema-registry-integrations-migration.md)

# SerDe Bibliotheken installieren
<a name="schema-registry-gs-serde"></a>

Die SerDe Bibliotheken bieten ein Framework für die Serialisierung und Deserialisierung von Daten. 

Sie installieren den Open-Source-Serializer für Ihre Anwendungen, die Daten erzeugen (zusammen die „Serializer“). Der Serializer übernimmt Serialisierung, Komprimierung und Interaktion mit der Schema Registry. Der Serializer extrahiert das Schema automatisch aus einem Datensatz, der in ein mit der Schema Registry kompatibles Ziel wie Amazon MSK geschrieben wird. Ebenso installieren Sie den Open-Source-Deserializer auf Ihren Anwendungen, die Daten verbrauchen.

# Java-Implementierung
<a name="schema-registry-gs-serde-java"></a>

**Anmerkung**  
Bevor Sie die folgenden Schritte ausführen, müssen Sie einen aktiven Amazon-MSK- ( Amazon Managed Streaming für Apache Kafka ) oder Apache-Kafka-Cluster haben. Ihre Produzenten und Verbraucher müssen Java 8 oder höher nutzen.

So installieren Sie die Bibliotheken auf Produzenten und Verbrauchern:

1. Fügen Sie diese Abhängigkeit in den pom.xml-Dateien von Produzenten und Verbrauchern über den folgenden Code hinzu:

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   Alternativ können Sie das [AWS Glue-Schema-Registry-Github-Repository](https://github.com/awslabs/aws-glue-schema-registry) klonen.

1. Richten Sie Ihre Produzenten mit den folgenden Eigenschaften ein:

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   Wenn keine Schemas vorhanden sind, muss die automatische Registrierung aktiviert werden (nächster Schritt). Wenn Sie ein Schema haben, das Sie anwenden möchten, ersetzen Sie „my-schema“ durch Ihren Schemanamen. Auch der „registry-name“ muss angegeben werden, wenn die automatische Registrierung des Schemas deaktiviert ist. Wenn das Schema unter der „default-registry“ erstellt wird, kann der Registrierungsname weggelassen werden.

1. (Optional) Legen Sie eine dieser optionalen Produzenteneigenschaften fest. Detaillierte Beschreibungen der Eigenschaften finden Sie in [der ReadMe Datei](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   Die automatische Registrierung registriert die Schemaversion unter der Standardregistrierung („default-registry“). Wenn im vorherigen Schritt kein `SCHEMA_NAME` angegeben wird, wird der Themenname als `SCHEMA_NAME` verwendet. 

   Weitere Informationen zu Kompatibilitätsmodi finden Sie unter [Schema-Versioning und -Kompatibilität](schema-registry.md#schema-registry-compatibility).

1. Richten Sie Ihre Verbraucher mit den folgenden Eigenschaften ein:

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS-Region
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (Optional) Legen Sie diese optionalen Verbrauchereigenschaften fest. Detaillierte Eigenschaftsbeschreibungen finden Sie [in der ReadMe Datei](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1-Implementierung
<a name="schema-registry-gs-serde-csharp"></a>

**Anmerkung**  
Bevor Sie die folgenden Schritte ausführen, müssen Sie einen aktiven Amazon-MSK- ( Amazon Managed Streaming für Apache Kafka ) oder Apache-Kafka-Cluster haben. Ihre Produzenten und Verbraucher müssen auf .NET 8.0 oder höher laufen.

## Installation
<a name="schema-registry-gs-serde-csharp-install"></a>

Installieren Sie für C\$1-Anwendungen das AWS Glue Schema SerDe NuGet Registry-Paket mit einer der folgenden Methoden:

**.NET-CLI:**  
Verwenden Sie den folgenden Befehl, um das Paket zu installieren:

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

wo `<rid>` könnte sein`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` oder `1.0.0-linux-arm64`

**PackageReference (in deiner.csproj-Datei):**  
Fügen Sie Ihrer Projektdatei Folgendes hinzu:

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

wo `<rid>` könnte sein`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` oder `1.0.0-linux-arm64`

## Konfiguration der Konfigurationsdatei
<a name="schema-registry-gs-serde-csharp-config"></a>

Erstellen Sie eine Datei mit Konfigurationseigenschaften (z. B.`gsr-config.properties`) mit den erforderlichen Einstellungen:

**Minimale Konfiguration:**  
Im Folgenden wird ein Beispiel für eine minimale Konfiguration gezeigt:

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Verwenden der C\$1 Glue Schema-Clientbibliothek für Kafka SerDes
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**Beispiel für die Verwendung des Serializers:**  
Das folgende Beispiel zeigt, wie der Serializer verwendet wird:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**Beispiel für die Verwendung des Deserializers:**  
Das folgende Beispiel zeigt, wie der Deserializer verwendet wird:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## Verwenden der C\$1 Glue-Schema-Clientbibliothek mit for KafkaFlow SerDes
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**Beispiel für die Verwendung des Serializers:**  
Das folgende Beispiel zeigt, wie Sie KafkaFlow mit dem Serializer konfigurieren:

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**Beispiel für die Verwendung des Deserializers:**  
Das folgende Beispiel zeigt, wie die Konfiguration KafkaFlow mit dem Deserializer durchgeführt wird:

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## Optionale Producer-Eigenschaften
<a name="schema-registry-gs-serde-csharp-optional"></a>

Sie können Ihre Konfigurationsdatei um zusätzliche optionale Eigenschaften erweitern:

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## Unterstützte Datenformate
<a name="schema-registry-gs-serde-supported-formats"></a>

Sowohl Java- als auch C\$1-Implementierungen unterstützen dieselben Datenformate:
+ *AVRO*: Apache Avro-Binärformat
+ *JSON: JSON-Schemaformat*
+ *PROTOBUF: Format* für Protokollpuffer

## Hinweise
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ [Um mit der Bibliothek zu beginnen, besuchen Sie bitte https://www.nuget. org/packages/AWS.Kleber. SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ Der Quellcode ist verfügbar unter: [https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# Erstellen einer Registrierung
<a name="schema-registry-gs3"></a>

Sie können die Standardregistrierung verwenden oder mit der AWS Glue OR-Konsole so viele neue Registrierungen wie nötig erstellen. AWS Glue APIs 

**AWS Glue APIs**  
Sie können diese Schritte verwenden, um diese Aufgabe mithilfe von auszuführen. AWS Glue APIs

Um die Registry AWS CLI für das AWS Glue Schema verwenden zu können APIs, stellen Sie sicher, dass Sie Ihre Version AWS CLI auf die neueste Version aktualisieren.

 Um eine neue Registrierung hinzuzufügen, verwenden Sie die [CreateRegistry Aktion (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry)-API. Geben Sie `RegistryName` als Name der zu erstellenden Registrierung an (maximale Länge 255 Zeichen, nur Buchstaben, Zahlen, Bindestriche, Unterstriche, Dollarzeichen oder Hashzeichen). 

Geben Sie `Description` als Zeichenfolge mit maximal 2048 Bytes an, entsprechend dem [mehrzeiligen Zeichenfolgenmuster für URI-Adressen](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns). 

Geben Sie optional einen oder mehrere `Tags` als Map-Array für Schlüssel-Wert-Paare Ihrer Registrierung an.

```
aws glue create-registry --registry-name registryName1 --description description
```

Wenn Ihre Registrierung erstellt wurde, wird ihr ein Amazon-Ressourcenname (ARN) zugewiesen, den Sie im Abschnitt `RegistryArn` der API-Antwort anzeigen können. Nachdem Sie nun eine Registrierung erstellt haben, erstellen Sie ein oder mehrere Schemata für diese Registrierung.

**AWS Glue-Konsole**  
In der AWS Glue-Konsole eine neue Registrierung hinzufügen

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schema Registries (Schemaregistrierungen)**.

1. Wählen Sie **Add registry (Registrierung hinzufügen)**.

1. Geben Sie einen **Registry name (Registrierungsname)** aus Buchstaben, Ziffern, Bindestrichen oder Unterstrichen ein. Dieser Name kann nicht geändert werden.

1. Geben Sie eine **Description (Beschreibung)** (optional) für die Registrierung ein.

1. Wenden Sie optional einen oder mehrere Tags auf Ihre Registrierung an. Klicken Sie auf **Add new Tag (Neuen Tag hinzufügen)** und geben Sie einen **Tag-Schlüssel** und optional einen **Tag-Wert** ein.

1. Wählen Sie **Add registry (Registrierung hinzufügen)**.

![\[Beispiel für das Erstellen einer Registrierung.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/schema_reg_create_registry.png)


Wenn Ihre Registrierung erstellt wird, wird ihr ein Amazon-Ressourcenname (ARN) zugewiesen, den Sie anzeigen können, indem Sie die Registrierung aus der Liste in **Schema Registries (Schemaregistrierungen)** auswählen. Nachdem Sie nun eine Registrierung erstellt haben, erstellen Sie ein oder mehrere Schemata für diese Registrierung.

# Umgang mit einem bestimmten Datensatz (JAVA POJO) für JSON
<a name="schema-registry-gs-json-java-pojo"></a>

Sie können ein Plain-Old-Java-Objekt (POJO) verwenden und das Objekt als Datensatz übergeben. Dies ist ähnlich dem Konzept eines bestimmten Datensatzes in AVRO. Das [mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema)kann ein JSON-Schema für das übergebene POJO generieren. Diese Bibliothek kann auch zusätzliche Informationen in das JSON-Schema einfügen.

Die AWS Glue-Schema-Registry-Bibliothek verwendet das injizierte Feld „className“ im Schema, um einen vollständig klassifizierten Klassennamen bereitzustellen. Das Feld „className“ wird vom Deserializer verwendet, um in ein Objekt dieser Klasse zu deserialisieren.

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# Erstellen eines Schemas
<a name="schema-registry-gs4"></a>

Sie können ein Schema mit der AWS Glue APIs oder der AWS Glue Konsole erstellen. 

**AWS Glue APIs**  
Sie können diese Schritte verwenden, um diese Aufgabe mithilfe von auszuführen AWS Glue APIs.

Um ein neues Schema hinzuzufügen, verwenden Sie die [CreateSchema Aktion (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema)-API.

Geben Sie eine `RegistryId`-Struktur an, um eine Registrierung für das Schema anzugeben. Oder lassen Sie die `RegistryId` weg, um die Standardregistrierung zu verwenden.

Geben Sie einen `SchemaName` bestehend aus Buchstaben, Ziffern, Bindestrichen oder Unterstrichen an, und geben Sie als `DataFormat` **AVRO** oder **JSON** an. Ist das `DataFormat` einmal für ein Schema festgelegt, ist nicht änderbar.

Geben Sie einen `Compatibility`-Modus an:
+ *Backward (Abwärts) (empfohlen)* – Der Verbraucher kann sowohl die aktuelle als auch die vorherige Version lesen.
+ *Backward all (Abwärts alle)* – Der Verbraucher kann die aktuelle und alle früheren Versionen lesen.
+ *Forward (Aufwärts)* – Der Verbraucher kann sowohl die aktuelle als auch die nachfolgende Version lesen.
+ *Forward all (Aufwärts alle)* – Der Verbraucher kann sowohl die aktuelle als auch alle nachfolgenden Versionen lesen.
+ *Full (Voll)* – Kombination aus „Backward“ (Abwärts) und „Forward“ (Aufwärts).
+ *Full all (Voll alle)* – Kombination aus „Backward all“ (Abwärts alle) und „Forward all“ (Aufwärts alle).
+ *None (Keine)* – Es werden keine Kompatibilitätsprüfungen durchgeführt.
+ *Disabled (Deaktiviert)* – Verhindert das Versioning für dieses Schema.

Geben Sie optional `Tags` für Ihr Schema an. 

Geben Sie eine `SchemaDefinition` an, um das Schema im Avro-, JSON-oder Protobuf-Datenformat zu definieren. Informationen finden Sie in den Beispielen.

Für Avro-Datenformat:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

Für JSON-Datenformat:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Für das Protobuf-Datenformat:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue-Konsole**  
Ein neues Schema mithilfe der AWS Glue-Konsole hinzufügen:

1. Melden Sie sich bei der AWS Management Console an und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schemas (Schemata)**.

1. Klicken Sie auf **Add Schema (Schema hinzufügen)**.

1. Geben Sie einen **Schemanamen** an, bestehend aus Buchstaben, Zahlen, Bindestrichen, Unterstrichen, Dollarzeichen oder Hashmarks. Dieser Name kann nicht geändert werden.

1. Wählen Sie im Dropdown-Menü die **Registry**, in der das Schema gespeichert wird. Die übergeordnete Registrierung kann nach der Erstellung nicht geändert werden.

1. Lassen Sie das **Data format (Datenformat)** als *Apache Avro* oder *JSON*. Dieses Format gilt für alle Versionen dieses Schemas.

1. Wählen Sie einen **Compatibility mode (Kompatibilitätsmodus)**.
   + *Backward (Abwärts) (empfohlen)* – Receiver kann sowohl aktuelle als auch frühere Versionen lesen.
   + *Backward All (Abwärts alle)* – Receiver kann aktuelle und alle früheren Versionen lesen.
   + *Vorwärts* – Absender kann sowohl aktuelle als auch frühere Versionen schreiben.
   + *Vorwärts alle* – Absender kann sowohl aktuelle als auch alle früheren Versionen schreiben.
   + *Full (Voll)* – Kombination aus „Backward“ (Abwärts) und „Forward“ (Aufwärts).
   + *Full all (Voll alle)* – Kombination aus „Backward all“ (Abwärts alle) und „Forward all“ (Aufwärts alle).
   + *Keine* – Es werden keine Kompatibilitätsprüfungen durchgeführt.
   + *Disabled (Deaktiviert)* – Verhindert das Versioning für dieses Schema.

1. Geben Sie eine optionale **Beschreibung** für die Registrierung ein (max. 250 Zeichen).  
![\[Beispiel für das Erstellen eines Schemas.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/schema_reg_create_schema.png)

1. Wenden Sie optional ein oder mehrere Tags auf Ihr Schema an. Klicken Sie auf **Add new Tag (Neuen Tag hinzufügen)** und geben Sie einen **Tag-Schlüssel** und optional einen **Tag-Wert** ein.

1. Geben Sie in das Feld **Erste Schemaversion** das ursprüngliche Schema ein oder fügen Sie es ein.

   Informationen zum Avro-Format finden Sie unter [Arbeiten mit dem Avro-Datenformat](#schema-registry-avro).

   Informationen zum JSON-Format finden Sie unter [Arbeiten mit dem JSON-Datenformat](#schema-registry-json).

1. Wählen Sie alternativ die Option **Add metadata (Metadaten hinzufügen)**, um Versionsmetadaten hinzuzufügen und Ihre Schemaversion mit Anmerkungen zu versehen oder zu klassifizieren.

1. Klicken Sie auf **Create schema and version (Schema und Version erstellen)**.

![\[Beispiel für das Erstellen eines Schemas.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/schema_reg_create_schema2.png)


Das Schema wird erstellt und in der Liste unter **Schemas (Schemata)** angezeigt.

## Arbeiten mit dem Avro-Datenformat
<a name="schema-registry-avro"></a>

Avro bietet Datenserialisierung und Datenaustausch. Avro speichert die Datendefinition im JSON-Format, wodurch es leicht zu lesen und zu interpretieren ist. Die Daten selbst werden im Binärformat gespeichert.

Informationen zum Definieren eines Apache-Avro-Schemas finden Sie unter [Apache Avro-Spezifikationen](http://avro.apache.org/docs/current/spec.html).

## Arbeiten mit dem JSON-Datenformat
<a name="schema-registry-json"></a>

Daten können im JSON-Format serialisiert werden. Das [JSON-Schemaformat](https://json-schema.org/) definiert den Standard für das Format des JSON-Schemas.

# Aktualisieren eines Schemas oder einer Registrierung
<a name="schema-registry-gs5"></a>

Nach der Erstellung können Sie Schemata, Schemaversionen und die Registrierung bearbeiten.

## Aktualisieren einer Registrierung
<a name="schema-registry-gs5a"></a>

Sie können eine Registrierung mithilfe der AWS Glue APIs oder der AWS Glue Konsole aktualisieren. Der Name einer vorhandenen Registrierung kann nicht bearbeitet werden. Sie können die Beschreibung für eine Registrierung bearbeiten.

**AWS Glue APIs**  
Um eine vorhandene Registrierung zu aktualisieren, verwenden Sie die [UpdateRegistry Aktion (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry)-API.

Wählen Sie eine `RegistryId`-Struktur, um die Registrierung anzugeben, die Sie aktualisieren möchten. Übergeben Sie eine `Description`, um die Beschreibung für eine Registrierung zu ändern.

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue-Konsole**  
Eine Registrierung mithilfe der AWS Glue-Konsole aktualisieren:

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schema Registries (Schemaregistrierungen)**.

1. Wählen Sie eine Registrierung aus der Liste der Registrierungen, indem Sie das entsprechende Kontrollkästchen aktivieren.

1. Klicken Sie im Menü **Action (Aktion)** auf **Edit registry (Registrierung bearbeiten)**.

# Aktualisieren eines Schemas
<a name="schema-registry-gs5b"></a>

Sie können die Beschreibung und die Kompatibilitätseinstellung für ein Schema aktualisieren.

Um ein vorhandenes Schema zu aktualisieren, verwenden Sie die [UpdateSchema Aktion (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema)-API.

Wählen Sie eine `SchemaId`-Struktur, um das Schema anzugeben, die Sie aktualisieren möchten. Eine `VersionNumber` oder `Compatibility` muss angegeben werden.

Codebeispiel 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# Eine Schemaversion hinzufügen
<a name="schema-registry-gs5c"></a>

Wenn Sie eine Schemaversion hinzufügen, müssen Sie die Versionen vergleichen, um sicherzustellen, dass das neue Schema akzeptiert wird.

Um eine neue Version zu einem vorhandenen Schema hinzuzufügen, verwenden Sie die [RegisterSchemaVersion Aktion (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion)-API.

Wählen Sie eine `SchemaId`-Struktur, um das Schema anzugeben, für das Sie eine Version hinzufügen möchten, und eine `SchemaDefinition`, um das Schema zu definieren.

Codebeispiel 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. Melden Sie sich bei an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schemas (Schemata)**.

1. Wählen Sie das Schema aus der Liste der Schemata aus, indem Sie das entsprechende Kontrollkästchen aktivieren.

1. Wählen Sie ein oder mehrere Schemata in der Liste aus, indem Sie die Kontrollkästchen aktivieren.

1. Wählen Sie im Menü **Action (Aktion)** die Option **Register a new version (Neue Version registrieren)**.

1. Geben Sie das neue Schema in das Feld **New version (Neue Version)** ein oder fügen Sie es ein.

1. Klicken Sie auf **Compare with previous version (Vergleich mit vorheriger Version)**, um Unterschiede zur vorherigen Schemaversion anzuzeigen.

1. Wählen Sie alternativ die Option **Add metadata (Metadaten hinzufügen)**, um Versionsmetadaten hinzuzufügen und Ihre Schemaversion mit Anmerkungen zu versehen oder zu klassifizieren. Geben Sie den **Schlüssel** und optional einen **Wert** ein.

1. Klicken Sie auf **Register version (Version registrieren)**.

![\[Eine Schemaversion hinzufügen.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/schema_reg_add_schema_version.png)


Die Version des Schemas (der Schemata) wird in der Liste der Versionen angezeigt. Wenn die Version den Kompatibilitätsmodus geändert hat, wird die Version als Checkpoint markiert.

## Beispiel für einen Vergleich von Schemaversionen
<a name="schema-registry-gs5c1"></a>

Wenn Sie entscheiden, die Option **Compare with previous version (Vergleich mit vorheriger Version)** zu verwenden, werden die vorherigen und die neuen Versionen zusammen angezeigt. Geänderte Informationen werden wie folgt hervorgehoben:
+ *Gelb*: Kennzeichnet geänderte Informationen.
+ *Grün*: Kennzeichnet Inhalte, die in der neuesten Version hinzugefügt wurden.
+ *Rot*: Kennzeichnet Inhalte, die in der neuesten Version entfernt wurden.

Sie können auch einen Vergleich mit früheren Versionen durchführen.

![\[Beispiel für einen Vergleich von Schemaversionen.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/schema_reg_version_comparison.png)


# Ein Schema oder eine Registrierung löschen
<a name="schema-registry-gs7"></a>

Das Löschen eines Schemas, einer Schemaversion oder einer Registrierung ist eine permanente Aktion, die nicht rückgängig gemacht werden kann.

## Ein Schema löschen
<a name="schema-registry-gs7a"></a>

Möglicherweise möchten Sie ein Schema löschen, wenn es nicht mehr in einer Registrierung verwendet werden soll, indem Sie die AWS-Managementkonsole oder die [DeleteSchema Aktion (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API verwenden.

Das Löschen eines oder mehrerer Schemata ist ein dauerhafter Vorgang, der nicht rückgängig gemacht werden kann. Stellen Sie sicher, dass das Schema oder die Schemata nicht mehr benötigt werden.

Um ein Schema aus der Registrierung zu löschen, rufen Sie die [DeleteSchema Aktion (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema)-API auf und geben die `SchemaId`-Struktur an, um das Schema zu identifizieren.

Zum Beispiel:

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue-Konsole**  
Ein Schema aus der AWS Glue-Konsole löschen:

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schema Registries (Schemaregistrierungen)**.

1. Wählen Sie die Registrierung, die Ihr Schema enthält, aus der Liste der Registrierungen aus.

1. Wählen Sie ein oder mehrere Schemata in der Liste aus, indem Sie die Kontrollkästchen aktivieren.

1. Wählen Sie im Menü **Actions (Aktionen)** die Option **Delete (Löschen)**.

1. Geben Sie **Delete** in das Feld ein, um den Löschvorgang zu bestätigen.

1. Wählen Sie **Löschen** aus.

Die von Ihnen angegebenen Schemata werden aus der Registrierung gelöscht.

## Eine Schemaversion löschen
<a name="schema-registry-gs7b"></a>

Da sich Schemas in der Registrierung ansammeln, möchten Sie möglicherweise unerwünschte Schemaversionen mithilfe der AWS-Managementkonsole oder der [DeleteSchemaVersions Aktion (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API löschen. Das Löschen einer oder mehrerer Schemaversionen ist ein dauerhafter Vorgang, der nicht rückgängig gemacht werden kann. Stellen Sie sicher, dass die Schemaversionen nicht mehr benötigt werden.

Beachten Sie beim Löschen von Schemaversionen die folgenden Einschränkungen:
+ Sie können keine Version mit Checkpoint löschen.
+ Die Anzahl zusammenhängender Versionen darf nicht mehr als 25 betragen.
+ Die neueste Schemaversion darf sich nicht im Status „Ausstehend“ befinden.

Geben Sie die `SchemaId`-Struktur an, um das Schema zu identifizieren, und geben Sie `Versions` als Bereich von Versionen an, die gelöscht werden sollen. Weitere Informationen zum Angeben einer Version oder eines Versionsbereichs finden Sie unter [DeleteRegistry Aktion (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Die von Ihnen angegebenen Schemaversionen werden aus der Registrierung gelöscht.

Das Aufrufen der [ListSchemaVersions Aktion (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions)-API nach diesem Aufruf listet den Status der gelöschten Versionen auf.

Beispiel:

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schema Registries (Schemaregistrierungen)**.

1. Wählen Sie die Registrierung, die Ihr Schema enthält, aus der Liste der Registrierungen aus.

1. Wählen Sie ein oder mehrere Schemata in der Liste aus, indem Sie die Kontrollkästchen aktivieren.

1. Wählen Sie im Menü **Actions (Aktionen)** die Option **Delete (Löschen)**.

1. Geben Sie **Delete** in das Feld ein, um den Löschvorgang zu bestätigen.

1. Wählen Sie **Löschen** aus.

Die von Ihnen angegebenen Schemaversionen werden aus der Registrierung gelöscht.

# Eine Registrierung löschen
<a name="schema-registry-gs7c"></a>

Unter Umständen möchten Sie eine Registrierung löschen, wenn die darin enthaltenen Schemata nicht mehr unter dieser Registrierung organisiert werden sollen. Sie müssen diese Schemata einer anderen Registrierung zuweisen.

Das Löschen einer oder mehrerer Registrierungen ist ein dauerhafter Vorgang, der nicht rückgängig gemacht werden kann. Stellen Sie sicher, dass die Registrierung(en) nicht mehr benötigt wird/werden.

Die Standardregistrierung kann mit der AWS CLI gelöscht werden.

**AWS Glue-API**  
Um die gesamte Registrierung einschließlich Schema und aller Schemaversionen zu löschen, rufen Sie die [DeleteRegistry Aktion (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry)-API auf. Geben Sie eine `RegistryId`-Struktur an, um die Registrierung zu identifizieren.

Zum Beispiel:

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

Um den Status des Löschvorgangs zu erhalten, können Sie die `GetRegistry`-API nach dem asynchronen Aufruf aufrufen.

**AWS Glue-Konsole**  
Eine Registrierung aus der AWS Glue-Konsole löschen:

1. Melden Sie sich bei an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Klicken Sie im Navigationsbereich unter **Data Catalog** auf **Schema Registries (Schemaregistrierungen)**.

1. Wählen Sie eine Registrierung aus der Liste, indem Sie das entsprechende Kontrollkästchen aktivieren.

1. Klicken Sie im Menü **Action (Aktion)** auf **Delete registry (Registrierung löschen)**.

1. Geben Sie **Delete** in das Feld ein, um den Löschvorgang zu bestätigen.

1. Wählen Sie **Löschen** aus.

Die ausgewählten Registrierungen werden aus AWS Glue gelöscht.

## IAM-Beispiele für Serialisierer
<a name="schema-registry-gs1"></a>

**Anmerkung**  
AWS verwaltete Richtlinien gewähren die erforderlichen Berechtigungen für allgemeine Anwendungsfälle. Informationen zur Verwendung verwalteter Richtlinien zum Verwalten der Schemaregistrierung finden Sie unter [AWS verwaltete (vordefinierte) Richtlinien für AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed). 

Für Serialisierer sollten Sie eine minimale Richtlinie wie unten erstellen, um eine Möglichkeit zu haben, die `schemaVersionId` für eine bestimmte Schemadefinition zu finden. Achten Sie darauf, dass Sie Leserechte für die Registrierung haben, um die Schemata in der Registrierung zu lesen. Sie können die Registrierungen, die gelesen werden können, mit der `Resource`-Klausel einschränken.

Codebeispiel 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

Sie können Produzenten auch erlauben, neue Schemata und Versionen zu erstellen, indem Sie die folgenden zusätzlichen Methoden einbeziehen. Beachten Sie, dass Sie in der Lage sein sollten, die Registrierung anhand add/remove/evolve der darin enthaltenen Schemas zu überprüfen. Sie können die Registrierungen, die inspiziert werden können, mit der `Resource`-Klausel einschränken.

Codebeispiel 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## IAM-Beispiele für Deserialisierer
<a name="schema-registry-gs1b"></a>

Für Deserialisierer (Verbraucherseite) sollten Sie eine ähnliche Richtlinie wie unten erstellen, damit der Deserializer das Schema aus der Schemaregistrierung zur Deserialisierung abrufen kann. Beachten Sie, dass Sie in der Lage sein sollten, die Registrierung zu überprüfen, um Schemata darin abzurufen.

Codebeispiel 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## Private Konnektivität mit AWS PrivateLink
<a name="schema-registry-gs-private"></a>

Sie können AWS PrivateLink es verwenden, um die VPC Ihres Datenproduzenten zu verbinden, AWS Glue indem Sie einen VPC-Schnittstellen-Endpunkt für definieren. AWS Glue Wenn Sie einen VPC-Schnittstellenendpunkt verwenden, findet die Kommunikation zwischen Ihrer VPC und AWS Glue vollständig innerhalb des AWS -Netzwerks statt. Weitere Informationen finden Sie unter [Verwenden von AWS Glue mit VPC-Endpunkten](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html).

# Zugreifen auf CloudWatch Amazon-Metriken
<a name="schema-registry-gs-monitoring"></a>

 CloudWatch Amazon-Metriken sind im Rahmen CloudWatch des kostenlosen Kontingents verfügbar. Sie können in der CloudWatch Konsole auf diese Messwerte zugreifen. Zu den Metriken auf API-Ebene gehören CreateSchema (Erfolg und Latenz) GetSchemaByDefinition, (Erfolg und Latenz), GetSchemaVersion (Erfolg und Latenz), RegisterSchemaVersion (Erfolg und Latenz), PutSchemaVersionMetadata (Erfolg und Latenz). Zu den Metriken auf Ressourcenebene gehört die Registrierung. ThrottledByLimit,. SchemaVersion ThrottledByLimit, SchemaVersion. Größe.

# CloudFormation Beispielvorlage für die Schemaregistrierung
<a name="schema-registry-integrations-cfn"></a>

Im Folgenden finden Sie eine Beispielvorlage für die Erstellung von Schemaregistrierungsressourcen in CloudFormation. Um diesen Stack in Ihrem Konto zu erstellen, kopieren Sie die obige Vorlage in eine Datei `SampleTemplate.yaml` und führen Sie dann den folgenden Befehl aus:

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

In diesem Beispiel verwenden wir die `AWS::Glue::Registry`, um eine Registrierung zu erstellen, `AWS::Glue::Schema` um ein Schema zu erstellen, `AWS::Glue::SchemaVersion` um eine Schemaversion zu erstellen und `AWS::Glue::SchemaVersionMetadata` um die Schemaversion mit Metadaten zu füllen. 

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# Integration mit AWS Glue Schema Registry
<a name="schema-registry-integrations"></a>

In diesen Abschnitten werden Integrationen mit der AWS Glue-Schemaregistrierung beschrieben. Die Beispiele in diesem Abschnitt zeigen ein Schema mit AVRO-Datenformat. Weitere Beispiele, einschließlich Schemas mit JSON-Datenformat, finden Sie in den Integrationstests und ReadMe Informationen im Open-Source-Repository [AWS GlueSchema Registry](https://github.com/awslabs/aws-glue-schema-registry).

**Topics**
+ [Anwendungsfall: Verbinden der Schema Registry mit Amazon MSK oder Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [Anwendungsfall: Integrieren von Amazon Kinesis Data Streams mit AWS Glue Schema Registry](#schema-registry-integrations-kds)
+ [Amazon Managed Service für Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [Anwendungsfall: Integration mit AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [Anwendungsfall: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [Anwendungsfall: AWS Glue Streaming](#schema-registry-integrations-aws-glue-streaming)
+ [Anwendungsfall: Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## Anwendungsfall: Verbinden der Schema Registry mit Amazon MSK oder Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

Nehmen wir an, Sie schreiben Daten in ein Apache-Kafka-Thema, und Sie können diese Schritte ausführen, um loszulegen.

1. Erstellen Sie einen Amazon Managed Streaming für Apache Kafka (Amazon MSK) oder Apache Kafka-Cluster mit mindestens einem Thema. Wenn Sie einen Amazon-MSK-Cluster erstellen, können Sie die AWS-Managementkonsole verwenden. Folgen Sie den Anweisungen unter [Erste Schritte mit Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) im *Entwicklerhandbuch für Amazon Managed Streaming for Apache Kafka*.

1. Folgen Sie dem Schritt [SerDe Bibliotheken installieren](schema-registry-gs-serde.md) oben.

1. Um Schemaregistrierungen, Schemata oder Schemaversionen zu erstellen, befolgen Sie die Anweisungen im Abschnitt [Erste Schritte mit der Schemaregistrierung](schema-registry-gs.md) dieses Dokuments.

1. Starten Sie Ihre Produzenten und Verbraucher, die Schema Registry zum Schreiben und Lesen von Datensätzen to/from zum Thema Amazon MSK oder Apache Kafka zu verwenden. Beispielcode für Produzenten und Verbraucher finden Sie in [der ReadMe Datei aus den](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) Serde-Bibliotheken. Die Schema-Registry-Bibliothek des Produzenten serialisiert den Datensatz automatisch und versieht den Datensatz mit einer Schemaversions-ID.

1. Wenn das Schema dieses Datensatzes eingegeben wurde oder die automatische Registrierung aktiviert ist, ist das Schema in der Schema Registry registriert.

1. Der Verbraucher, der mit der AWS Glue-Schema-Registry-Bibliothek aus dem Amazon-MSK- oder Apache-Kafka-Thema liest, sucht das Schema automatisch aus der Schema Registry.

## Anwendungsfall: Integrieren von Amazon Kinesis Data Streams mit AWS Glue Schema Registry
<a name="schema-registry-integrations-kds"></a>

Diese Integration erfordert, dass Sie einen vorhandenen Amazon Kinesis Data Stream haben. Weitere Informationen finden Sie unter [Erste Schritte mit Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html) im *Entwicklerhandbuch für Amazon Kinesis Data Streams*.

Es gibt zwei Möglichkeiten, mit Daten in einem Kinesis-Datenstrom zu interagieren.
+ Über die Bibliotheken Kinesis Producer Library (KPL) und Kinesis Client Library (KCL) in Java. Mehrsprachige Unterstützung wird nicht bereitgestellt.
+ Durch die`PutRecords`,`PutRecord`, und `GetRecords` Kinesis Data Streams, die in der APIs AWS SDK für Java verfügbar sind.

Wenn Sie die KPL/KCL Bibliotheken derzeit verwenden, empfehlen wir, diese Methode weiterhin zu verwenden. Es gibt aktualisierte KCL- und KPL-Versionen mit integrierter Schema Registry, wie in den Beispielen gezeigt. Andernfalls können Sie den Beispielcode verwenden, um die AWS Glue Schema Registry zu nutzen, wenn Sie das KDS APIs direkt verwenden.

Die Integration der Schema Registry ist mit KPL v0.14.2 oder höher und mit KCL v2.3 oder höher verfügbar. Die Integration der Schema Registry mit dem JSON-Datenformat ist mit KPL v0.14.8 oder höher und mit KCL v2.3.6 oder höher verfügbar.

### Interaktion mit Daten mit Kinesis SDK V2
<a name="schema-registry-integrations-kds-sdk-v2"></a>

In diesem Abschnitt wird die Interaktion mit Kinesis mithilfe des Kinesis SDK V2 beschrieben

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### Interaktion mit Daten mithilfe der Bibliotheken KPL/KCL
<a name="schema-registry-integrations-kds-libraries"></a>

In diesem Abschnitt wird die Integration von Kinesis Data Streams mit Schema Registry mithilfe der KPL/KCL Bibliotheken beschrieben. Weitere Informationen zur Verwendung von KPL/KCL finden Sie unter [Entwickeln von Amazon Kinesis Datenstreams-Produzenten mit der Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) im *Entwicklerhandbuch für Amazon Kinesis Data Streams*.

#### Einrichten der Schema Registry in KPL
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. Definieren Sie die Schemadefinition für die Daten, das Datenformat und den Schemanamen, die in der AWS Glue Schema Registry erstellt wurden

1. Konfigurieren Sie optional das `GlueSchemaRegistryConfiguration`-Objekt.

1. Übergeben Sie das Schemaobjekt an die `addUserRecord API`.

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Einrichten der Kinesis Client-Bibliothek
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Sie entwickeln Ihre Kinesis Client Library-Verbraucher in Java. Weitere Informationen finden Sie unter [Entwickeln eines Kinesis Client Library-Verbrauchers in Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) im *Entwicklerhandbuch zu Amazon Kinesis Data Streams*.

1. Erstellen Sie eine Instance von `GlueSchemaRegistryDeserializer` durch Übergeben eines `GlueSchemaRegistryConfiguration`-Objekts.

1. Übergeben Sie den `GlueSchemaRegistryDeserializer` an `retrievalConfig.glueSchemaRegistryDeserializer`.

1. Greifen Sie auf das Schema eingehender Nachrichten zu, indem Sie `kinesisClientRecord.getSchema()` aufrufen.

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Interaktion mit Daten mithilfe der Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis"></a>

In diesem Abschnitt wird die Integration von Kinesis Data Streams mit Schema Registry mithilfe der Kinesis Data Streams beschrieben. APIs

1. Aktualisieren Sie diese Maven-Abhängigkeiten:

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. Fügen Sie im Produzenten Schema-Header-Informationen mithilfe der `PutRecords`- oder `PutRecord`-API in Kinesis Data Streams hinzu.

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. Verwenden Sie im Produzenten die `PutRecords`- oder `PutRecord`-API, um den Datensatz in den Datenstrom einzufügen.

1. Entfernen Sie im Verbraucher den Schemadatensatz aus dem Header und serialisieren Sie einen Avro-Schemadatensatz.

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Interaktion mit Daten mithilfe der Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis-reference"></a>

Im Folgenden finden Sie einen Beispielcode für die Verwendung von `PutRecords` und `GetRecords` APIs.

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Amazon Managed Service für Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink ist ein beliebtes Open-Source-Framework und eine verteilte Verarbeitungs-Engine für statusbehaftete Berechnungen über unbegrenzte und begrenzte Datenströme. Amazon Managed Service für Apache Flink ist ein vollständig verwalteter AWS Service, mit dem Sie Apache Flink-Anwendungen zur Verarbeitung von Streaming-Daten erstellen und verwalten können.

Open Source Apache Flink bietet eine Reihe von Quellen und Senken. Vordefinierte Datenquellen umfassen beispielsweise das Lesen von Dateien, Verzeichnissen und Sockets sowie das Aufnehmen von Daten aus Sammlungen und Iteratoren. Apache Flink DataStream Connectors bieten Code für Apache Flink als Schnittstelle zu verschiedenen Systemen von Drittanbietern, wie Apache Kafka oder Kinesis als Quellsenken. and/or 

Weitere Informationen finden Sie im [Amazon Kinesis Data Analytics-Entwicklerhandbuch](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html).

### Apache Flink Kafka Connector
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink bietet einen Apache-Kafka-Datenstrom-Konnektor für das Lesen von Daten aus Kafka-Themen und das Schreiben von Daten in Kafka-Themen mit Genau-Einmal-Garantie. Der Kafka-Verbraucher von Fink, `FlinkKafkaConsumer`, bietet Zugriff auf das Lesen aus einem oder mehreren Kafka-Themen. Der Apache-Flink-Kafka-Produzent `FlinkKafkaProducer` ermöglicht das Schreiben eines Streams von Datensätzen zu einem oder mehreren Kafka-Themen. Weitere Informationen finden Sie unter [Kinesis Kafka Konnektor](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html).

### Apache Flink Kinesis Streams Connector
<a name="schema-registry-integrations-kinesis-connector"></a>

Der Kinesis Data Stream Connector bietet Zugriff auf Amazon Kinesis Data Streams. Die `FlinkKinesisConsumer` ist eine exakt einmal parallel Streaming-Datenquelle, die mehrere Kinesis-Streams innerhalb derselben AWS Serviceregion abonniert und das Re-Sharding von Streams transparent handhaben kann, während der Job ausgeführt wird. Jede Unteraufgabe des Verbrauchers ist für das Abrufen von Datensätzen aus mehreren Kinesis-Shards verantwortlich. Die Anzahl der Shards, die von jeder Unteraufgabe abgerufen werden, ändert sich, wenn Shards geschlossen und von Kinesis erstellt werden. Der `FlinkKinesisProducer` verwendet die Kinesis Producer Library (KPL), um Daten aus einem Apache-Flink-Stream in einen Kinesis-Stream zu übertragen. Weitere Informationen finden Sie unter [Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html).

Weitere Informationen finden Sie unter [AWS Glue-Schema-Github-Repository](https://github.com/awslabs/aws-glue-schema-registry).

### Integration mit Apache Flink
<a name="schema-registry-integrations-apache-flink-integrate"></a>

Die mit Schema Registry bereitgestellte SerDes Bibliothek ist in Apache Flink integriert. Um mit Apache Flink zu arbeiten, müssen Sie die Schnittstellen [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) und [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) namens `GlueSchemaRegistryAvroSerializationSchema` und `GlueSchemaRegistryAvroDeserializationSchema` implementieren, die Sie in Apache-Flink-Konnektoren einbinden können.

### Hinzufügen einer AWS Glue Schema Registry Dependency zur Apache-Flink-Anwendung
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Integrationsabhängigkeiten für AWS Glue Schema Registry in der Apache-Flink-Anwendung einrichten:

1. Fügen Sie die Abhängigkeit zur `pom.xml`-Datei hinzu.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Integration von Kafka oder Amazon MSK mit Apache Flink
<a name="schema-registry-integrations-kda-integrate-msk"></a>

Sie können Managed Service für Apache Flink für Apache Flink mit Kafka als Quelle oder Kafka als Senke verwenden.

**Kafka als Quelle**  
Das folgende Diagramm zeigt die Integration von Kinesis Data Streams mit Managed Service für Apache Flink für Apache Flink, mit Kafka als Quelle.

![\[Kafka als Quelle.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka als Senke**  
Das folgende Diagramm zeigt die Integration von Kinesis Data Streams mit Managed Service für Apache Flink für Apache Flink, mit Kafka als Senke.

![\[Kafka als Senke.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/gsr-kafka-sink.png)


Um Kafka (oder Amazon MSK) in Managed Service für Apache Flink für Apache Flink zu integrieren, mit Kafka als Quelle oder Kafka als Senke, nehmen Sie die folgenden Codeänderungen vor. Fügen Sie in den entsprechenden Abschnitten die fett formatierten Codeblöcke zu Ihrem jeweiligen Code hinzu.

Wenn Kafka die Quelle ist, verwenden Sie den Deserialisierer-Code (Block 2). Wenn Kafka die Senke ist, verwenden Sie den Serialisierer-Code (Block 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Integrieren von Kinesis Data Streams in Apache Flink
<a name="schema-registry-integrations-integrate-kds"></a>

Sie können Managed Service für Apache Flink für Apache Flink mit Kinesis Data Streams als Quelle oder Senke verwenden.

**Kinesis Data Streams als Quelle**  
Das folgende Diagramm zeigt die Integration von Kinesis Data Streams mit Managed Service für Apache Flink für Apache Flink, mit Kinesis Data Streams als Quelle.

![\[Kinesis Data Streams als Quelle.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams als Senke**  
Das folgende Diagramm zeigt die Integration von Kinesis Data Streams mit Managed Service für Apache Flink für Apache Flink, mit Kinesis Data Streams als Senke.

![\[Kinesis Data Streams als Senke.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/gsr-kinesis-sink.png)


Um Kinesis Data Streams mit Managed Service für Apache Flink für Apache Flink zu integrieren, mit Kinesis Data Streams als Quelle oder Kinesis Data Streams als Senke, nehmen Sie die folgenden Codeänderungen vor. Fügen Sie in den entsprechenden Abschnitten die fett formatierten Codeblöcke zu Ihrem jeweiligen Code hinzu.

Wenn Kinesis Data Streams die Quelle ist, verwenden Sie den Deserialisierer-Code (Block 2). Wenn Kinesis Data Streams die Senke ist, verwenden Sie den Serialisierer-Code (Block 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## Anwendungsfall: Integration mit AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

[Um eine AWS Lambda Funktion als Apache Kafka/Amazon MSK-Consumer zu verwenden und AVRO-kodierte Nachrichten mithilfe von AWS Glue Schema Registry zu deserialisieren, besuchen Sie die MSK Labs-Seite.](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)

## Anwendungsfall: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue-Tabellen unterstützen Schemata, die Sie manuell oder durch Verweis auf die AWS Glue Schema Registry angeben können. Die Schema Registry wird in den Data Catalog integriert, damit Sie optional Schemata verwenden können, die in der Schema Registry gespeichert sind, wenn Sie AWS Glue-Tabellen oder -Partitionen im Data Catalog erstellen oder aktualisieren. Um eine Schemadefinition in der Schema Registry zu identifizieren, müssen Sie mindestens den ARN des Schemas kennen, zu dem diese gehört. Eine Schemaversion eines Schemas, die eine Schemadefinition enthält, kann durch seine UUID oder Versionsnummer referenziert werden. Es gibt immer eine Schemaversion, die „neueste“ Version, die nachgeschlagen werden kann, ohne die Versionsnummer oder UUID zu kennen.

Bei Aufrufen der `CreateTable`- oder `UpdateTable`-Operationen übergeben Sie eine `TableInput`-Struktur mit einem `StorageDescriptor`, der unter Umständen eine `SchemaReference` auf ein vorhandenes Schema in der Schema Registry hat. In ähnlicher Weise kann die Antwort `GetPartition` APIs, wenn Sie `GetTable` oder aufrufen, das Schema und die enthalten`SchemaReference`. Wenn eine Tabelle oder Partition mit Schemareferenzen erstellt wurde, versucht der Data Catalog, das Schema für diese Schemareferenz abzurufen. Falls er das Schema in der Schema Registry nicht findet, wird ein leeres Schema in der `GetTable`-Antwort zurückgegeben. Andernfalls enthält die Antwort sowohl das Schema als auch die Schemareferenz.

Sie können die folgenden Aktionen auch in der AWS Glue-Konsole ausführen.

Um diese Vorgänge durchzuführen und die Schemainformationen zu erstellen, zu aktualisieren oder anzuzeigen, müssen Sie dem aufrufenden Benutzer eine IAM-Rolle zuweisen, die Berechtigungen für die `GetSchemaVersion`-API bereitstellt.

### Hinzufügen einer Tabelle oder Aktualisieren des Schemas für eine Tabelle
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

Das Hinzufügen einer neuen Tabelle aus einem vorhandenen Schema bindet die Tabelle an eine bestimmte Schemaversion. Sobald neue Schemaversionen registriert wurden, können Sie diese Tabellendefinition auf der Seite Tabelle anzeigen in der AWS Glue-Konsole oder mithilfe der [UpdateTable Aktion (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable)-API aktualisieren.

#### Hinzufügen einer Tabelle aus einem vorhandenen Schema
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

Sie können eine AWS Glue-Tabelle aus einer Schemaversion in der Registrierung mithilfe der AWS Glue-Konsole oder der `CreateTable`-API erstellen.

**AWS Glue-API**  
Bei Aufrufen der `CreateTable`-API übergeben Sie eine `TableInput` mit einem `StorageDescriptor` und einer `SchemaReference` zu einem vorhandenen Schema in der Schema Registry.

**AWS Glue-Konsole**  
Eine Tabelle mithilfe der AWS Glue-Konsole erstellen:

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Wählen Sie im Navigationsbereich unter **Data Catalog** die Option **Tables (Tabellen)**.

1. Wählen Sie im Menü **Add Tables (Tabellen hinzufügen)** die Option **Add table from existing schema (Tabelle aus vorhandenem Schema hinzufügen)**.

1. Konfigurieren Sie die Tabelleneigenschaften und den Datenspeicher gemäß dem AWS Glue-Entwicklerhandbuch.

1. Wählen Sie auf der Seite **Choose a Glue schema (Glue-Schema wählen)** die **Registry**, in dem sich das Schema befindet.

1. Wählen Sie den **Schema name (Schemaname)** und wählen Sie die **Version** des anzuwendenden Schemas.

1. Überprüfen Sie die Schemavorschau und klicken Sie auf **Next (Weiter)**.

1. Überprüfen und erstellen Sie die Tabelle.

Das Schema und die Version, die auf die Tabelle angewendet werden, werden in der Spalte **Glue-Schema** in der Liste der Tabellen angezeigt. Sie können die Tabelle anzeigen, um weitere Details zu sehen.

#### Aktualisieren des Schemas für eine Tabelle
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

Wenn eine neue Schemaversion verfügbar wird, können Sie das Schema einer Tabelle mit der [UpdateTable Aktion (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable)-API oder der AWS Glue-Konsole aktualisieren. 

**Wichtig**  
Beim Aktualisieren des Schemas für eine Tabelle, für die ein AWS Glue-Schema manuell angegeben wurde, ist das neue Schema, auf das in der Schema Registry verwiesen wird, möglicherweise inkompatibel. Dies kann dazu führen, dass Ihre Aufträge fehlschlagen.

**AWS Glue-API**  
Bei Aufrufen der `UpdateTable`-API übergeben Sie eine `TableInput` mit einem `StorageDescriptor` und einer `SchemaReference` zu einem vorhandenen Schema in der Schema Registry.

**AWS Glue-Konsole**  
Das Schema für eine Tabelle aus der AWS Glue-Konsole aktualisieren

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Wählen Sie im Navigationsbereich unter **Data Catalog** die Option **Tables (Tabellen)**.

1. Zeigen Sie die Tabelle aus der Liste der Tabellen an.

1. Klicken Sie auf **Update schema (Schema aktualisieren)** in dem Feld, das Sie über die neue Version informiert.

1. Überprüfen Sie die Unterschiede zwischen dem aktuellen und dem neuen Schema.

1. Klicken Sie auf **Show all schema differences (Alle Schemaunterschiede anzeigen)**, um weitere Details zu sehen.

1. Klicken Sie auf **Save table (Tabelle speichern)**, um die neue Version zu akzeptieren.

## Anwendungsfall: AWS Glue Streaming
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue Streaming verbraucht Daten aus Streaming-Quellen und führt ETL-Operationen durch, bevor sie in eine Ausgabe-Sink geschrieben werden. Die Eingabe-Streaming-Quelle kann mit einer Datentabelle oder direkt durch Angabe der Quellkonfiguration angegeben werden.

AWS Glue Streaming unterstützt eine Datenkatalog-Tabelle für die Streaming-Quelle, die mit dem Schema imAWS Glue Schema Registry erstellt wurde. Sie können ein Schema im AWS Glue Schema Registry erstellen und eine AWS Glue-Tabelle mit einer Streaming-Quelle erstellen, die dieses Schema verwendet. Diese AWS Glue-Tabelle kann als Eingabe für einen AWS Glue-Streaming-Auftrag zum Deserialisieren von Daten im Eingabe-Stream verwendet werden.

Wenn sich das Schema im AWS Glue Schema Registry ändert, müssen Sie den AWS Glue-Streaming-Auftrag neu starten, damit die Änderungen im Schema berücksichtigt werden.

## Anwendungsfall: Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Die Apache Kafka Streams API ist eine Client-Bibliothek zur Verarbeitung und Analyse von Daten, die in Apache Kafka gespeichert sind. Dieser Abschnitt beschreibt die Integration von Apache Kafka Streams mit der AWS Glue Schema Registry, mit der Sie Schemata in Ihren Datenstreaming-Anwendungen verwalten und durchsetzen können. Weitere Informationen zu Apache Kafka Streams finden Sie unter [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).

### Integration mit den SerDes Bibliotheken
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

Es gibt eine `GlueSchemaRegistryKafkaStreamsSerde`-Klasse, mit der Sie eine Streams-Anwendung konfigurieren können.

#### Beispielcode für die Kafka-Streams-Anwendung
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

So verwenden Sie die AWS Glue Schema Registry innerhalb einer Apache-Kafka-Streams-Anwendung:

1. Konfigurieren Sie die Kafka-Streams-Anwendung.

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. Erstellen Sie einen Stream aus dem Thema avro-input.

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. Verarbeiten Sie die Datensätze (das Beispiel filtert die Datensätze heraus, deren Wert favorite\$1color pink ist oder bei denen der Wert von „amount“ 15 ist).

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. Schreiben Sie die Ergebnisse zurück in das Thema avro-output.

   ```
   result.to("avro-output");
   ```

1. Starten Sie die Apache-Kafka-Streams-Anwendung.

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### Ergebnisse der Implementierung
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

Diese Ergebnisse zeigen den Filtervorgang von Datensätzen, die in Schritt 3 als favorite\$1color mit „pink“ oder „15.0“ herausgefiltert wurden.

Datensätze vor dem Filtern:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

Datensätze nach dem Filtern:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### Anwendungsfall: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

Durch die Integration von Apache Kafka Connect mit der AWS Glue Schema Registry können Sie Schemainformationen von Konnektoren abrufen. Die Apache-Kafka-Konverter geben das Format der Daten in Apache Kafka an, und wie diese in Apache-Kafka-Connect-Daten übersetzt werden. Jeder Apache-Kafka-Connect-Benutzer muss diese Konverter basierend auf dem Format konfigurieren, in dem seine Daten geladen oder in Apache Kafka gespeichert werden sollen. Auf diese Weise können Sie eigene Konverter definieren, um Apache-Kafka-Connect-Daten in den Typ zu übersetzen, der in AWS Glue Schema Registry verwendet wird (zum Beispiel: Avro), und unseren Serializer nutzen, um das Schema zu registrieren und die Serialisierung durchzuführen. Dann können Konverter auch unseren Deserializer verwenden, um die von Apache Kafka empfangenen Daten zu deserialisieren und wieder in Apache-Kafka-Connect-Daten zu konvertieren. Ein Beispiel für ein Workflow-Diagramm ist unten angegeben.

![\[Apache-Kafka-Connect-Workflow.\]](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. Installieren Sie das `aws-glue-schema-registry`-Projekt durch Klonen des [Github-Repository für die AWS GlueSchema Registry](https://github.com/awslabs/aws-glue-schema-registry).

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. Wenn Sie planen, Apache Kafka Connect im *Standalone-Modus* zu verwenden, aktualisieren Sie die **connect-standalone.properties** mit der untenstehenden Anleitung für diesen Schritt. Wenn Sie Apache Kafka Connect im *verteilten* Modus verwenden möchten, aktualisieren Sie **connect-avro-distributed.properties** mit denselben Anweisungen.

   1. Fügen Sie diese Eigenschaften auch der Apache-Kafka-Connect-Properties-Datei hinzu:

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. **Fügen Sie den folgenden Befehl zum Abschnitt **Startmodus** unter kafka-run-class .sh hinzu:**

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. **Fügen Sie den folgenden Befehl zum Abschnitt **Startmodus** unter kafka-run-class .sh hinzu**

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   Das sollte wie folgt aussehen:

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. Wenn Sie bash verwenden, führen Sie die folgenden Befehle aus, um Ihren CLASSPATH in Ihrem bash\$1profile einzurichten. Aktualisieren Sie die Umgebung für jede andere Shell entsprechend.

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (Optional) Wenn Sie mit einer einfachen Dateiquelle testen möchten, klonen Sie den Dateiquellen-Konnektor.

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. Ändern Sie unter der Konfiguration des Quellen-Konnektors das Datenformat auf Avro, den Datei-Reader auf`AvroFileReader` und aktualisieren Sie ein Beispiel-Avro-Objekt aus dem Dateipfad, aus dem Sie lesen. Zum Beispiel:

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. Installieren Sie den Quellen-Konnektor.

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. Aktualisieren Sie die Senkeneigenschaften unter `<your Apache Kafka installation directory>/config/connect-file-sink.properties`, aktualisieren Sie den Namen des Themas und den Dateinamen.

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Starten Sie den Quellen-Konnektor (in diesem Beispiel handelt es sich um einen Dateiquellen-Konnektor).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Führen Sie den Quellen-Konnektor aus (in diesem Beispiel handelt es sich um einen Dateiquellen-Konnektor).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Ein Beispiel für die Verwendung von Kafka Connect finden Sie im Skript run-local-tests .sh im Ordner integration-tests im [Github-Repository für die Schema Registry](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests). AWS Glue

# Migration von einer Drittanbieter-Schemaregistrierung zu AWS Glue Schema Registry
<a name="schema-registry-integrations-migration"></a>

Bei der Migration von der Schemaregistrierung eines Drittanbieters zur AWS Glue Schema Registry gibt es eine Abhängigkeit von der vorhandenen, aktuellen Drittanbieter-Schemaregistrierung. Wenn in einem Apache-Kafka-Thema Datensätze vorhanden sind, die mithilfe einer Drittanbieter-Schemaregistrierung gesendet wurden, benötigen Verbraucher diese Schemaregistrierung, um die Datensätze zu deserialisieren. Der `AWSKafkaAvroDeserializer` bietet die Möglichkeit, eine sekundäre Deserializer-Klasse anzugeben, die auf den Drittanbieter-Deserializer verweist und zum Deserialisieren dieser Datensätze verwendet wird.

Es gibt zwei Kriterien für das Ausmustern eines Drittanbieter-Schemas. Erstens kann das Ausmustern nur erfolgen, wenn Datensätze in Apache-Kafka-Themen, die die Drittanbieter-Schemaregistrierung verwenden, nicht mehr von und für Verbraucher benötigt werden. Zweitens kann der Ruhestand durch Alterung der Apache-Kafka-Themen erfolgen, abhängig von der für diese Themen festgelegten Aufbewahrungsfrist. Beachten Sie, dass bei Themen mit unendlicher Aufbewahrung dennoch auf die AWS Glue Schema Registry migrieren können, aber die Schemaregistrierung des Drittanbieters nicht ausmustern können. Als Lösung können Sie eine Anwendung oder Mirror Maker 2 verwenden, um aus dem aktuellen Thema zu lesen und ein neues Thema mit der AWS Glue Schema Registry zu erstellen.

Von der Schemaregistrierung eines Drittanbieters zur AWS Glue Schema Registry migrieren:

1. Erstellen Sie eine Registrierung in der AWS Glue Schema Registry oder verwenden Sie die Standardregistrierung.

1. Stoppen Sie den Konsumenten. Ändern Sie diesen, um die AWS Glue Schema Registry als primären Deserialisierer zu verwenden und die Schemaregistrierung eines Drittanbieters als sekundären. 
   + Legen Sie die Verbrauchereigenschaften fest. In diesem Beispiel wird der secondary\$1deserializer auf einen anderen Deserializer gesetzt. Das Verhalten ist wie folgt: Der Verbraucher ruft Datensätze aus Amazon MSK ab und versucht zunächst den `AWSKafkaAvroDeserializer`. Wenn er nicht in der Lage ist, das magische Byte mit der Avro Schema-ID für das AWS Glue-Schema-Registry-Schema zu lesen, versucht `AWSKafkaAvroDeserializer`, die Deserializer-Klasse im secondary\$1deserializer zu verwenden. Die für den sekundären Deserializer spezifischen Eigenschaften müssen auch in den Verbrauchereigenschaften bereitgestellt werden, z. B. in den Eigenschaften schema\$1registry\$1url\$1config und specific\$1avro\$1reader\$1config, wie unten dargestellt.

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. Starten Sie den Verbraucher neu.

1. Stoppen Sie den Produzenten und verweisen Sie den Produzenten auf die AWS Glue Schema Registry.

   1. Legen Sie die Produzenteneigenschaften fest. In diesem Beispiel verwendet der Produzent die Schemaversionen default-registry und auto register.

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (Optional) Verschieben Sie vorhandene Schemata und Schemaversionen manuell aus der aktuellen Drittanbieter-Schemaregistrierung in die AWS Glue Schema Registry, entweder in die Standardregistrierung in AWS Glue Schema Registry oder in eine bestimmte nicht standardmäßige Registrierung in AWS Glue Schema Registry. Dies kann erreicht werden, indem Schemas aus den Schemaregistern von Drittanbietern im JSON-Format exportiert und neue Schemas in AWS Glue Schema Registry mithilfe von oder erstellt werden. AWS-Managementkonsole AWS CLI

    Dieser Schritt kann wichtig sein, wenn Sie Kompatibilitätsprüfungen mit früheren Schemaversionen für neu erstellte Schemaversionen mit AWS CLI und dem aktivieren müssen AWS-Managementkonsole, oder wenn Produzenten Nachrichten mit einem neuen Schema senden, bei dem die automatische Registrierung von Schemaversionen aktiviert ist.

1. Starten Sie den Produzenten.