

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Démarrage avec le registre de schémas
<a name="schema-registry-gs"></a>

Les sections suivantes fournissent une présentation et vous expliquent comment configurer et utiliser le registre de schémas. Pour plus d’informations sur les concepts et les composants du registre de schémas, consultez [Registre de schémas AWS Glue](schema-registry.md).

**Topics**
+ [Installation de SerDe bibliothèques](schema-registry-gs-serde.md)
+ [Intégration au registre de schémas AWS Glue](schema-registry-integrations.md)
+ [Migration d'un registre de schémas tiers vers le registre de schémas AWS Glue](schema-registry-integrations-migration.md)

# Installation de SerDe bibliothèques
<a name="schema-registry-gs-serde"></a>

Les SerDe bibliothèques fournissent un cadre pour la sérialisation et la désérialisation des données. 

Vous allez installer le sérialiseur open source pour vos applications produisant des données (collectivement les « sérialiseurs »). Le sérialiseur gère la sérialisation, la compression et l'interaction avec le registre de schémas. Le sérialiseur extrait automatiquement le schéma d'un enregistrement en cours d'écriture vers une destination compatible avec le registre de schémas, telle qu'Amazon MSK. De même, vous allez installer le désérialiseur open source sur vos applications consommant des données.

# Implémentation Java
<a name="schema-registry-gs-serde-java"></a>

**Note**  
Prérequis : avant d'effectuer les étapes suivantes, vous devez avoir un Amazon Managed Streaming pour Apache Kafka (Amazon MSK) ou un cluster Apache Kafka en cours d'exécution. Vos applications producteur et consommateur doivent utiliser Java 8 ou une version supérieure.

Pour installer les bibliothèques sur les applications producteur et consommateur :

1. Dans les fichiers pom.xml des applications producteur et consommateur, ajoutez cette dépendance via le code ci-dessous :

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   Vous pouvez également cloner le [référentiel Github du registre de schémas AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

1. Configurez vos applications producteur avec les propriétés requises suivantes :

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   S'il n'existe aucun schéma existant, l'enregistrement automatique doit être activé (étape suivante). Si vous avez un schéma que vous souhaitez appliquer, remplacez « my-schema » par le nom de votre schéma. La valeur « registry-name » doit également être fournie si l'enregistrement automatique du schéma est désactivé. Si le schéma est créé sous la valeur « default-registry », le nom du registre peut être omis.

1. (Facultatif) Définissez l'une de ces propriétés de producteur facultatives. Pour une description détaillée des propriétés, consultez [le ReadMe fichier](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   L'enregistrement automatique enregistre la version du schéma sous le registre par défaut (« default-registry »). Si une valeur `SCHEMA_NAME` n'est pas spécifiée à l'étape précédente, le nom de la rubrique est alors déduit comme étant `SCHEMA_NAME`. 

   Pour de plus amples informations sur les modes de compatibilité, veuillez consulter [Gestion des versions et compatibilité des schémas](schema-registry.md#schema-registry-compatibility).

1. Configurez vos applications consommateur avec les propriétés requises suivantes :

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Région AWS
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (Facultatif) Définissez ces propriétés d'application consommateur facultatives. Pour une description détaillée des propriétés, consultez [le ReadMe fichier](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# Implémentation en C\$1
<a name="schema-registry-gs-serde-csharp"></a>

**Note**  
Prérequis : avant d'effectuer les étapes suivantes, vous devez avoir un Amazon Managed Streaming pour Apache Kafka (Amazon MSK) ou un cluster Apache Kafka en cours d'exécution. Vos producteurs et consommateurs doivent utiliser .NET 8.0 ou une version ultérieure.

## Installation
<a name="schema-registry-gs-serde-csharp-install"></a>

Pour les applications C\$1, installez le SerDe NuGet package AWS Glue Schema Registry en utilisant l'une des méthodes suivantes :

**CLI .NET :**  
Utilisez la commande suivante pour installer le package :

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

où `<rid>` cela pourrait être`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` ou `1.0.0-linux-arm64`

**PackageReference (dans votre fichier .csproj) :**  
Ajoutez ce qui suit à votre fichier de projet :

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

où `<rid>` cela pourrait être`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` ou `1.0.0-linux-arm64`

## Configuration du fichier de configuration
<a name="schema-registry-gs-serde-csharp-config"></a>

Créez un fichier de propriétés de configuration (par exemple,`gsr-config.properties`) avec les paramètres requis :

**Configuration minimale :**  
Voici un exemple de configuration minimale :

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Utilisation de la bibliothèque cliente C\$1 Glue Schema pour Kafka SerDes
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**Exemple d'utilisation du sérialiseur :**  
L'exemple suivant montre comment utiliser le sérialiseur :

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**Exemple d'utilisation du désérialiseur :**  
L'exemple suivant montre comment utiliser le désérialiseur :

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## Utilisation de la bibliothèque cliente C\$1 Glue Schema avec for KafkaFlow SerDes
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**Exemple d'utilisation du sérialiseur :**  
L'exemple suivant montre comment configurer KafkaFlow avec le sérialiseur :

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**Exemple d'utilisation du désérialiseur :**  
L'exemple suivant montre comment configurer KafkaFlow avec le désérialiseur :

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## Propriétés facultatives du producteur
<a name="schema-registry-gs-serde-csharp-optional"></a>

Vous pouvez étendre votre fichier de configuration avec des propriétés facultatives supplémentaires :

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## Formats de données pris en charge
<a name="schema-registry-gs-serde-supported-formats"></a>

Les implémentations Java et C\$1 prennent en charge les mêmes formats de données :
+ *AVRO : format* binaire Apache Avro
+ *JSON* : format de schéma JSON
+ *PROTOBUF : format des* tampons de protocole

## Remarques
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ Pour commencer à utiliser la bibliothèque, rendez-vous sur [https://www.nuget. org/packages/AWS.Colle. SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ Le code source est disponible à l'adresse suivante : [https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# Création d'un registre
<a name="schema-registry-gs3"></a>

Vous pouvez utiliser le registre par défaut ou créer autant de nouveaux registres que nécessaire à l'aide de la AWS Glue console AWS Glue APIs or.

**AWS Glue APIs**  
Vous pouvez suivre ces étapes pour effectuer cette tâche à l'aide du AWS Glue APIs.

Pour utiliser le AWS CLI registre des AWS Glue schémas APIs, assurez-vous de mettre à jour votre version AWS CLI à la plus récente.

 Pour ajouter un nouveau registre, utilisez l'API [CreateRegistry action (Python : create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry). Spécifiez `RegistryName` comme nom du registre à créer, avec une longueur maximale de 255, contenant uniquement des lettres, des chiffres, des traits d'union, des traits de soulignement, le symbole dollar ou le dièse. 

Spécifiez une valeur `Description` comme une chaîne d’une longueur maximale de 2 048 octets, correspondant au [modèle de chaîne à plusieurs lignes d’adresse URI](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns). 

Vous pouvez également spécifier une ou plusieurs `Tags` pour votre registre, sous forme de tableau de mappage de paires clé-valeur.

```
aws glue create-registry --registry-name registryName1 --description description
```

Lorsque votre registre est créé, il se voit attribuer un Amazon Resource Name (ARN), que vous pouvez consulter dans le `RegistryArn` de la réponse de l'API. Maintenant que vous avez créé un registre, créez un ou plusieurs schémas pour ce registre.

**Console AWS Glue **  
Pour ajouter un nouveau registre dans la console AWS Glue :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schema registries (Registres de schémas)**.

1. Choisissez **Add registry (Ajouter un registre)**.

1. Saisissez un **nom du registre** à donner au registre, composé de lettres, de chiffres, de traits d'union et de traits de soulignement. Ce nom ne peut pas être modifié.

1. Saisissez une **description** (facultatif) pour le registre.

1. Si vous le souhaitez, appliquez une ou plusieurs balises à votre registre. Choisissez **Add new tag (Ajouter une nouvelle balise)** et spécifiez une **clé de balise**, et éventuellement une **valeur de balise**.

1. Choisissez **Add registry (Ajouter un registre)**.

![\[Exemple de création d'un registre.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/schema_reg_create_registry.png)


Lorsque votre registre est créé, un Amazon Resource Name (ARN) lui est attribué, que vous pouvez consulter en choisissant le registre dans la liste **Schema registries (Registres de schémas)**. Maintenant que vous avez créé un registre, créez un ou plusieurs schémas pour ce registre.

# Traiter un enregistrement spécifique (JAVA POJO) pour JSON
<a name="schema-registry-gs-json-java-pojo"></a>

Vous pouvez utiliser un ancien objet Java simple (POJO) et transmettre l'objet en tant qu'enregistrement. Ceci est similaire à la notion d'enregistrement spécifique dans AVRO. Ils [mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema)peuvent générer un schéma JSON pour le POJO transmis. Cette bibliothèque peut également injecter des informations supplémentaires dans le schéma JSON.

La bibliothèque de registre de schémas AWS Glue utilise le champ « ClassName » injecté dans le schéma pour fournir un nom de classe entièrement classifié. Le champ « ClassName » est utilisé par le désérialiseur pour désérialiser dans un objet de cette classe.

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# Création d'un schéma
<a name="schema-registry-gs4"></a>

Vous pouvez créer un schéma à l'aide de la console AWS Glue APIs ou de la AWS Glue console. 

**AWS Glue APIs**  
Vous pouvez suivre ces étapes pour effectuer cette tâche à l'aide du AWS Glue APIs.

Pour ajouter un nouveau schéma, utilisez l'API [CreateSchema action (Python : créer\$1schéma)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema).

Spécifiez une structure `RegistryId` pour indiquer un registre pour le schéma. Ou, omettez la valeur `RegistryId` pour utiliser le registre par défaut.

Spécifiez une valeur `SchemaName` composée de lettres, de chiffres, de traits d'union et de traits de soulignement, ainsi qu'une valeur `DataFormat` comme **AVRO** ou **JSON**. Une fois la valeur `DataFormat` définie sur un schéma, elle n'est pas modifiable.

Spécifiez un mode `Compatibility` :
+ *Backward (Descendant) (recommandé)* — L'application consommateur peut lire à la fois la version actuelle et précédente.
+ *Backward all (Descendant tout)* — L'application consommateur peut lire les versions actuelles et toutes les versions précédentes.
+ *Forward (Ascendant)* — L'application consommateur peut lire à la fois la version actuelle et la version ultérieure.
+ *Forward all (Ascendant tout)* — L'application consommateur peut lire à la fois les versions actuelles et les versions ultérieures.
+ *Full (Complet)* — Combinaison de Forward (Ascendant) et de Backward (Descendant).
+ *Full all (Complet tout)* — Combinaison de Backward all (Descendant tout) et de Forward all (Ascendant tout).
+ *None (Aucun)* — Aucune vérification de compatibilité n'est effectuée.
+ *Disabled (Désactivé)* — Empêche toute gestion des versions pour ce schéma.

Le cas échéant, spécifiez `Tags` pour votre schéma. 

Spécifiez une valeur `SchemaDefinition` pour définir le schéma au format de données Avro, JSON ou Protobuf. Veuillez consulter les exemples.

Pour le format de données Avro :

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

Pour le format de données JSON :

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Pour le format de données Protobuf :

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**Console AWS Glue **  
Pour ajouter un nouveau schéma à l'aide de la console AWS Glue :

1. Connectez-vous à la console AWS de gestion et ouvrez-la à l'AWS Glueadresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schemas (Schémas)**.

1. Choisissez **Add schema (Ajouter un schéma)**.

1. Saisissez un **nom de schéma**, composé de lettres, de chiffres, de traits d'union, de traits de soulignement, de symboles dollar ou de dièses. Ce nom ne peut pas être modifié.

1. Cliquez sur l'onglet **Registry (Registre)** où le schéma sera stocké à partir du menu déroulant. Le registre parent ne peut pas être modifié après la création.

1. Quitter le **format de données** en tant qu'*Apache Avro* ou *JSON*. Ce format s'applique à toutes les versions de ce schéma.

1. Choisissez un **mode de compatibilité**.
   + *Backward (Descendant) (recommandé)* — Le récepteur peut lire à la fois les versions actuelles et précédentes.
   + *Backward All (Descendant tout)* — Le récepteur peut lire les versions actuelles et toutes les versions précédentes.
   + *Forward (Ascendant)* — L'expéditeur peut écrire à la fois les versions actuelles et précédentes.
   + *Forward all (Ascendant tout)* — L'expéditeur peut écrire la version actuelle et toutes les versions précédentes.
   + *Full (Complet)* — Combinaison de Forward (Ascendant) et de Backward (Descendant).
   + *Full All (Complet tout)* — Combinaison de Backward All (Descendant tout) et de Forward All (Ascendant tout).
   + *None (Aucun)* — Aucune vérification de compatibilité n'est effectuée.
   + *Disabled (Désactivé)* — Empêche toute gestion des versions pour ce schéma.

1. Saisissez une **description** facultative pour le registre de 250 caractères maximum.  
![\[Exemple de création d'un schéma.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/schema_reg_create_schema.png)

1. Si vous le souhaitez, appliquez une ou plusieurs balises à votre schéma. Choisissez **Add new tag (Ajouter une nouvelle balise)** et spécifiez une **clé de balise**, et éventuellement une **valeur de balise**.

1. Dans la case **First schema version (Première version de schéma)**, saisissez ou collez votre schéma initial.

   Pour le format Avro, voir [Utilisation du format de données Avro](#schema-registry-avro)

   Pour le format JSON, voir [Utilisation du format de données JSON](#schema-registry-json)

1. Choisissez éventuellement **Add metadata (Ajouter des métadonnées)** pour ajouter des métadonnées de version afin d'annoter ou de classer votre version de schéma.

1. Choisissez **Create schema and version (Créer un schéma et une version)**.

![\[Exemple de création d'un schéma.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/schema_reg_create_schema2.png)


Le schéma est créé et apparaît dans la liste sous **Schemas (Schémas)**.

## Utilisation du format de données Avro
<a name="schema-registry-avro"></a>

Avro fournit des services de sérialisation et d'échange de données. Avro stocke la définition des données au format JSON, ce qui la rend facile à lire et à interpréter. Les données elles-mêmes sont stockées au format binaire.

Pour plus d'informations sur la définition d'un schéma Apache Avro, veuillez consulter la [spécification d'Apache Avro](http://avro.apache.org/docs/current/spec.html).

## Utilisation du format de données JSON
<a name="schema-registry-json"></a>

Les données peuvent être sérialisées au format JSON. Le [format de schéma JSON](https://json-schema.org/) définit la norme pour le format de schéma JSON.

# Mise à jour d'un schéma ou d'un registre
<a name="schema-registry-gs5"></a>

Une fois créés, vous pouvez modifier vos schémas, vos versions de schéma ou votre registre.

## Mise à jour d'un registre
<a name="schema-registry-gs5a"></a>

Vous pouvez mettre à jour un registre à l'aide de la console AWS Glue APIs ou de la AWS Glue console. Le nom d'un registre existant ne peut pas être modifié. Vous pouvez modifier la description d'un registre.

**AWS Glue APIs**  
Pour mettre à jour un registre existant, utilisez l'API [UpdateRegistry action (Python : update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry).

Spécifiez une structure `RegistryId` pour indiquer le registre que vous souhaitez mettre à jour. Transmettez une `Description` pour modifier la description d'un registre.

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**Console AWS Glue **  
Pour mettre à jour un registre à l'aide de la console AWS Glue :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schema registries (Registres de schémas)**.

1. Sélectionnez un registre dans la liste des registres, en cochant sa case.

1. Dans le menu **Action**, choisissez **Edit registry (Modifier un registre)**.

# Mise à jour d'un schéma
<a name="schema-registry-gs5b"></a>

Vous pouvez mettre à jour le paramètre de description ou de compatibilité pour un schéma.

Pour mettre à jour un schéma existant, utilisez l'API [UpdateSchema action (Python : update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema).

Spécifiez une structure `SchemaId` pour indiquer le schéma que vous souhaitez mettre à jour. `VersionNumber` ou `Compatibility` doit être fourni.

Exemple de code 11 :

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# Ajout d'une version de schéma
<a name="schema-registry-gs5c"></a>

Lorsque vous ajoutez une version de schéma, vous devez comparer les versions pour vous assurer que le nouveau schéma sera accepté.

Pour ajouter une nouvelle version à un schéma existant, utilisez l'API [RegisterSchemaVersion action (Python : register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion).

Spécifiez une structure `SchemaId` pour indiquer le schéma pour lequel vous souhaitez ajouter une version, ainsi qu'une valeur `SchemaDefinition` pour définir le schéma.

Exemple de code 12 :

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schemas (Schémas)**.

1. Sélectionnez le schéma dans la liste des schémas, en cochant sa case.

1. Sélectionnez un ou plusieurs schémas dans la liste, en cochant les cases.

1. Dans le menu **Action**, choisissez **Register new version (Enregistrer une nouvelle version)**.

1. Dans **New version (Nouvelle version)**, saisissez ou collez votre nouveau schéma.

1. Choisissez **Compare with previous version (Comparer avec la version précédente)** pour voir les différences avec la version de schéma précédente.

1. Choisissez éventuellement **Add metadata (Ajouter des métadonnées)** pour ajouter des métadonnées de version afin d'annoter ou de classer votre version de schéma. Saisissez une **clé** et une **valeur** facultative.

1. Choisissez **Register version (Version de registre)**.

![\[Ajout d'une version de schéma.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/schema_reg_add_schema_version.png)


La version du ou des schémas s'affiche dans la liste des versions. Si la version a changé le mode de compatibilité, la version sera marquée comme point de contrôle.

## Exemple de comparaison d'une version de schéma
<a name="schema-registry-gs5c1"></a>

Lorsque vous choisissez **Compare with previous version (Comparer avec la version précédente)**, vous verrez les versions précédentes et les nouvelles versions affichées ensemble. Les informations modifiées seront mises en évidence comme suit :
+ *Jaune* : indique les informations modifiées.
+ *Vert* : indique le contenu ajouté à la dernière version.
+ *Rouge* : indique le contenu supprimé dans la dernière version.

Vous pouvez également comparer les versions antérieures.

![\[Exemple de comparaison d'une version de schéma.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/schema_reg_version_comparison.png)


# Suppression d'un schéma ou d'un registre
<a name="schema-registry-gs7"></a>

La suppression d'un schéma, d'une version de schéma ou d'un registre est une action permanente qui ne peut pas être annulée.

## Suppression d'un schéma
<a name="schema-registry-gs7a"></a>

Vous souhaiterez peut-être supprimer un schéma lorsqu'il ne sera plus utilisé dans un registre, à l'aide de l' AWS Management Console API ou de l'[DeleteSchema action (Python : supprimer\$1schéma)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema)API.

La suppression d'un ou plusieurs schémas est une action permanente qui ne peut pas être annulée. Assurez-vous que le ou les schémas ne sont plus nécessaires.

Pour supprimer un schéma du registre, appelez l'API [DeleteSchema action (Python : supprimer\$1schéma)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema), en spécifiant la structure `SchemaId` pour identifier le schéma.

Par exemple :

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**Console AWS Glue **  
Pour supprimer un schéma de la console AWS Glue :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schema registries (Registres de schémas)**.

1. Choisissez le registre qui contient votre schéma dans la liste des registres.

1. Sélectionnez un ou plusieurs schémas dans la liste, en cochant les cases.

1. Dans le menu **Action**, sélectionnez **Delete schema (Supprimer un schéma)**.

1. Saisissez le texte **Delete** dans le champ pour confirmer la suppression.

1. Choisissez **Delete** (Supprimer).

Le ou les schémas que vous avez spécifiés sont supprimés du registre.

## Suppression d'une version de schéma
<a name="schema-registry-gs7b"></a>

Au fur et à mesure que les schémas s'accumulent dans le registre, vous souhaiterez peut-être supprimer les versions de schéma indésirables à l'aide de l' AWS Management Console API ou de l'[DeleteSchemaVersions action (Python : delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions)API. La suppression d'une ou de plusieurs versions de schéma est une action permanente qui ne peut pas être annulée. Assurez-vous que les versions de schéma ne sont plus nécessaires.

Lorsque vous supprimez des versions de schéma, veuillez tenir compte des contraintes suivantes :
+ Vous ne pouvez pas supprimer une version comportant un point de contrôle.
+ La gamme de versions contiguës ne peut pas dépasser 25.
+ La version de schéma la plus récente ne doit pas être dans un état en attente.

Spécifiez la structure `SchemaId` pour identifier le schéma et spécifiez `Versions` comme une plage de versions à supprimer. Pour plus d'informations sur la spécification d'une version ou d'une plage de versions, veuillez consulter [DeleteRegistry action (Python : supprimer\$1registre)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Les versions de schéma que vous avez spécifiées sont supprimées du registre.

En appelant l'API [ListSchemaVersions action (Python : list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) après cet appel, vous obtiendrez la liste de l'état des versions supprimées.

Par exemple :

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schema registries (Registres de schémas)**.

1. Choisissez le registre qui contient votre schéma dans la liste des registres.

1. Sélectionnez un ou plusieurs schémas dans la liste, en cochant les cases.

1. Dans le menu **Action**, sélectionnez **Delete schema (Supprimer un schéma)**.

1. Saisissez le texte **Delete** dans le champ pour confirmer la suppression.

1. Sélectionnez **Delete (Supprimer)**.

Les versions de schéma que vous avez spécifiées sont supprimées du registre.

# Suppression d'un registre
<a name="schema-registry-gs7c"></a>

Vous pouvez supprimer un registre lorsque les schémas qu'il contient ne doivent plus être organisés sous ce registre. Vous devrez réaffecter ces schémas à un autre registre.

La suppression d'un ou de plusieurs registres est une action permanente qui ne peut pas être annulée. Assurez-vous que le ou les registres ne sont plus requis.

Le registre par défaut peut être supprimé à l'aide de la AWS CLI.

**API AWS Glue**  
Pour supprimer l'intégralité du registre, y compris le schéma et toutes ses versions, appelez l'API [DeleteRegistry action (Python : supprimer\$1registre)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Spécifiez une structure `RegistryId` pour identifier le registre.

Par exemple :

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

Pour obtenir le statut de l'opération de suppression, vous pouvez appeler l'API `GetRegistry` après l'appel asynchrone.

**Console AWS Glue **  
Pour supprimer un registre de la console AWS Glue :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Schema registries (Registres de schémas)**.

1. Sélectionnez un registre dans la liste en cochant une case.

1. Dans le menu **Action**, choisissez **Delete registry (Supprimer un registre)**.

1. Saisissez le texte **Delete** dans le champ pour confirmer la suppression.

1. Sélectionnez **Delete (Supprimer)**.

Les registres que vous avez sélectionnés sont supprimés de AWS Glue.

## Exemples d'IAM pour les sérialiseurs
<a name="schema-registry-gs1"></a>

**Note**  
AWS les politiques gérées accordent les autorisations nécessaires pour les cas d'utilisation courants. Pour plus d'informations sur l'utilisation des politiques gérées pour gérer le registre de schéma, veuillez consulter [AWS politiques gérées (prédéfinies) pour AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed). 

Pour les sérialiseurs, vous devez créer une politique minimale similaire à celle ci-dessous pour vous donner la possibilité de trouver le `schemaVersionId` pour une définition de schéma donnée. Notez que vous devez disposer des autorisations de lecture sur le registre afin de lire les schémas dans le registre. Vous pouvez limiter les registres qui peuvent être lus à l'aide de la clause `Resource`.

Exemple de code 13 :

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

En outre, vous pouvez également autoriser les applications producteur à créer des schémas et versions en incluant les méthodes supplémentaires suivantes. Notez que vous devriez être en mesure d'inspecter le registre afin de vérifier add/remove/evolve les schémas qu'il contient. Vous pouvez limiter les registres qui peuvent être inspectés à l'aide de la clause `Resource`.

Exemple de code 14 :

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## Exemples d'IAM pour les désérialiseurs
<a name="schema-registry-gs1b"></a>

Pour les désérialiseurs (côté application consommateur), vous devez créer une politique similaire à celle ci-dessous pour autoriser le désérialiseur à récupérer le schéma à partir du registre de schéma pour la désérialisation. Notez que vous devriez être en mesure d'inspecter le registre afin d'extraire les schémas qu'il contient.

Exemple de code 15 :

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## Connectivité privée à l'aide de AWS PrivateLink
<a name="schema-registry-gs-private"></a>

Vous pouvez l'utiliser AWS PrivateLink pour connecter le VPC de votre producteur de données AWS Glue en définissant un point de terminaison VPC d'interface pour. AWS Glue Lorsque vous utilisez un point de terminaison de VPC, la communication entre votre VPC et AWS Glue est gérée au sein du réseau AWS . Pour plus d'informations, veuillez consulter [Utilisation d'AWS Glue avec les points de terminaison d'un VPC](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html).

# Accès aux CloudWatch métriques d'Amazon
<a name="schema-registry-gs-monitoring"></a>

Les CloudWatch statistiques Amazon sont disponibles dans le cadre CloudWatch de l'offre gratuite. Vous pouvez accéder à ces métriques dans la CloudWatch console. Les indicateurs au niveau de l'API incluent CreateSchema (succès et latence) GetSchemaByDefinition, (succès et latence), GetSchemaVersion (succès et latence), RegisterSchemaVersion (succès et latence), PutSchemaVersionMetadata (succès et latence). Les métriques au niveau des ressources incluent le registre. ThrottledByLimit, SchemaVersion. ThrottledByLimit, SchemaVersion .Taille.

# Exemple de CloudFormation modèle pour le registre de schémas
<a name="schema-registry-integrations-cfn"></a>

Voici un exemple de modèle pour créer des ressources de registre de schémas dans CloudFormation. Pour créer cette pile dans votre compte, copiez le modèle ci-dessus dans un fichier `SampleTemplate.yaml` et exécutez la commande suivante :

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

Cet exemple utilise `AWS::Glue::Registry` pour créer un registre, `AWS::Glue::Schema` pour créer un schéma, `AWS::Glue::SchemaVersion` pour créer une version de schéma et `AWS::Glue::SchemaVersionMetadata` pour renseigner les métadonnées de version de schéma. 

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# Intégration au registre de schémas AWS Glue
<a name="schema-registry-integrations"></a>

Ces sections décrivent les intégrations au registre de schémas AWS Glue. Les exemples présentés dans cette section montrent un schéma au format de données AVRO. Pour d'autres exemples, notamment des schémas au format de données JSON, consultez les tests d'intégration et les ReadMe informations dans le [référentiel open source du registre des AWS Glue schémas](https://github.com/awslabs/aws-glue-schema-registry).

**Topics**
+ [Cas d'utilisation : connexion du registre de schémas à Amazon MSK ou à Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [Cas d'utilisation : intégration d'Amazon Kinesis Data Streams au registre de schémas AWS Glue](#schema-registry-integrations-kds)
+ [Cas d’utilisation : Service géré Amazon pour Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [Cas d'utilisation : intégration avec AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [Cas d'utilisation : AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [Cas d'utilisation : streaming AWS Glue](#schema-registry-integrations-aws-glue-streaming)
+ [Cas d'utilisation : Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## Cas d'utilisation : connexion du registre de schémas à Amazon MSK ou à Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

Supposons que vous écrivez des données sur une rubrique Apache Kafka, et que vous pouvez suivre ces étapes pour commencer.

1. Créez un cluster Amazon Managed Streaming pour Apache Kafka (Amazon MSK) ou Apache Kafka avec au moins un sujet. Si vous créez un cluster Amazon MSK, vous pouvez utiliser la AWS Management Console. Suivez les instructions ci-après : [Mise en route avec Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) dans le *Guide du développeur Amazon Managed Streaming for Apache Kafka*.

1. Suivez l'étape [Installation de SerDe bibliothèques](schema-registry-gs-serde.md) ci-dessus.

1. Pour créer des registres de schéma, des schémas ou des versions de schéma, suivez les instructions sous la section [Démarrage avec le registre de schémas](schema-registry-gs.md) de ce document.

1. Faites en sorte que vos producteurs et consommateurs utilisent le registre des schémas pour écrire et lire des enregistrements sur to/from le sujet Amazon MSK ou Apache Kafka. Des exemples de code producteur et consommateur se trouvent dans [le ReadMe fichier](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md) des bibliothèques Serde. La bibliothèque du registre de schémas du producteur sérialisera automatiquement l'enregistrement et décorera l'enregistrement avec un ID de version de schéma.

1. Si le schéma de cet enregistrement a été saisi, ou si l'enregistrement automatique est activé, le schéma aura été enregistré dans le registre de schémas.

1. La lecture de l'application consommateur à partir de la rubrique Amazon MSK ou Apache Kafka, à l'aide de la bibliothèque du registre de schémas AWS Glue, recherche automatiquement le schéma à partir du registre de schémas.

## Cas d'utilisation : intégration d'Amazon Kinesis Data Streams au registre de schémas AWS Glue
<a name="schema-registry-integrations-kds"></a>

Cette intégration nécessite que vous ayez un flux de données Amazon Kinesis existant. Pour plus d'informations, veuillez consulter [Présentation des Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

Il existe deux façons d'interagir avec les données d'un flux de données Kinesis.
+ Via les bibliothèques Kinesis Producer Library (KPL) et Kinesis Client Library (KCL) en Java. La prise en charge multilingue n'est pas fournie.
+ Grâce au `PutRecords``PutRecord`, et aux `GetRecords` Kinesis Data APIs Streams disponibles dans le AWS SDK pour Java.

Si vous utilisez actuellement les KPL/KCL bibliothèques, nous vous recommandons de continuer à utiliser cette méthode. Il existe des versions KCL et KPL mises à jour avec le registre de schémas intégré, comme illustré dans les exemples. Sinon, vous pouvez utiliser l'exemple de code pour tirer parti du registre des AWS Glue schémas si vous utilisez APIs directement le KDS.

L'intégration du registre de schémas n'est disponible qu'avec KPL version 0.14.2 ou ultérieure et avec KCL version 2.3 ou ultérieure. L'intégration du registre de schémas avec le format de données JSON est disponible avec KPL version 0.14.8 ou ultérieure et avec KCL version 2.3.6 ou ultérieure.

### Interagir avec les données à l'aide du kit SDK Kinesis V2
<a name="schema-registry-integrations-kds-sdk-v2"></a>

Cette section décrit l'interaction avec Kinesis à l'aide du kit SDK Kinesis V2

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### Interaction avec les données à l'aide KPL/KCL des bibliothèques
<a name="schema-registry-integrations-kds-libraries"></a>

Cette section décrit l'intégration de Kinesis Data Streams à Schema Registry à KPL/KCL l'aide des bibliothèques. Pour en savoir plus sur l'utilisation de KPL/KPC, veuillez consulter [Développement d'applications producteur à l'aide de la bibliothèque producteur Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*..

#### Configuration du registre de schémas dans KPL
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. Définissez la définition de schéma pour les données, le format de données et le nom de schéma créés dans le registre de schémas AWS Glue.

1. Le cas échéant, configurez l'objet `GlueSchemaRegistryConfiguration`.

1. Transmettez l'objet du schéma à `addUserRecord API`.

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Configuration de la bibliothèque client Kinesis
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Vous allez développer votre application consommateur de la bibliothèque client Kinesis en Java. Pour de plus amples informations, veuillez consulter [Développement d'une application consommateur de la bibliothèque client Kinesis en Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Créez une instance de `GlueSchemaRegistryDeserializer` en transmettant un objet `GlueSchemaRegistryConfiguration`.

1. Transmettez le `GlueSchemaRegistryDeserializer` à `retrievalConfig.glueSchemaRegistryDeserializer`.

1. Accédez au schéma des messages entrants en appelant `kinesisClientRecord.getSchema()`.

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Interaction avec les données à l'aide des Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis"></a>

Cette section décrit l'intégration de Kinesis Data Streams à Schema Registry à l'aide des Kinesis Data Streams. APIs

1. Mettez à jour ces dépendances Maven :

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. Dans l'application producteur, ajoutez des informations d'en-tête de schéma à l'aide de l'API `PutRecords` ou `PutRecord`dans Kinesis Data Streams.

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. Dans l'application producteur, utilisez l'API `PutRecords` ou `PutRecord` pour placer l'enregistrement dans le flux de données.

1. Dans l'application consommateur, supprimez l'enregistrement de schéma de l'en-tête et sérialisez un enregistrement de schéma Avro.

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Interaction avec les données à l'aide des Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis-reference"></a>

Voici un exemple de code pour utiliser le `PutRecords` et `GetRecords` APIs.

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Cas d’utilisation : Service géré Amazon pour Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink est un cadre open source populaire et un moteur de traitement distribué pour les calculs avec état sur des flux de données sans limite et limités. Amazon Managed Service pour Apache Flink est un AWS service entièrement géré qui vous permet de créer et de gérer des applications Apache Flink pour traiter des données de streaming.

Apache Flink open source fournit un certain nombre de sources et de récepteurs. Par exemple, les sources de données prédéfinies incluent la lecture à partir de fichiers, de répertoires et de sockets, ainsi que l'ingestion de données à partir de collections et d'itérateurs. DataStream Les connecteurs Apache Flink fournissent du code permettant à Apache Flink de s'interfacer avec divers systèmes tiers, tels qu'Apache Kafka ou Kinesis, en tant que récepteurs de sources. and/or 

Pour plus d'informations, veuillez consulter le [Guide du développeur Amazon Kinesis Data Analytics](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html).

### Connecteur Kafka Apache Flink
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink fournit un connecteur de flux de données Apache Kafka pour lire et écrire des données sur des sujets Kafka avec des garanties en une seule fois. L'application consommateur Kafka de Flink, `FlinkKafkaConsumer`, permet d'accéder à la lecture d'une ou de plusieurs rubriques Kafka. L'application producteur kafka d'Apache Flink, `FlinkKafkaProducer`, permet d'écrire un flux d'enregistrements à une ou plusieurs rubriques Kafka. Pour de plus amples informations, veuillez consulter [Apache Kafka Connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html).

### Connecteur de flux Kinesis Apache Flink
<a name="schema-registry-integrations-kinesis-connector"></a>

Le connecteur de flux de données Kinesis vous permet d'accéder à Amazon Kinesis Data Streams. `FlinkKinesisConsumer`Il s'agit d'une source de données de streaming parallèle qui s'abonne à plusieurs flux Kinesis au sein de la même région de AWS service et peut gérer de manière transparente le repartitionnement des flux pendant l'exécution de la tâche. Chaque sous-tâche de l'application consommateur est responsable de la récupération des enregistrements de données à partir de plusieurs partitions Kinesis. Le nombre de partitions récupérées par chaque sous-tâche change au fur et à mesure que les partitions sont fermées et créées par Kinesis. Le `FlinkKinesisProducer` utilise Kinesis Producer Library (KPL) pour placer les données d'un flux Apache Flink dans un flux Kinesis. Pour plus d'informations, veuillez consulter [Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html).

Pour plus d'informations, veuillez consulter le [Référentiel GitHub de schémas AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

### Intégration à Apache Flink
<a name="schema-registry-integrations-apache-flink-integrate"></a>

La SerDes bibliothèque fournie avec Schema Registry s'intègre à Apache Flink. Pour travailler avec Apache Flink, vous devez implémenter des interfaces [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) et [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) appelées `GlueSchemaRegistryAvroSerializationSchema` et `GlueSchemaRegistryAvroDeserializationSchema`, que vous pouvez brancher sur des connecteurs Apache Flink.

### Ajout d'une dépendance du registre de schémas AWS Glue dans l'application Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Pour configurer les dépendances d'intégration sur le registre de schémas AWS Glue dans l'application Apache Flink :

1. Ajoutez la dépendance au fichier `pom.xml`.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Intégration de Kafka ou Amazon MSK à Apache Flink
<a name="schema-registry-integrations-kda-integrate-msk"></a>

Vous pouvez utiliser Service géré pour Apache Flink pour Apache Flink, avec Kafka comme source ou Kafka comme collecteur.

**Kafka en tant que source**  
Le diagramme suivant illustre l'intégration de Kinesis Data Streams Kinesis à Service géré pour Apache Flink pour Apache Flink, avec Kafka en tant que source.

![\[Kafka en tant que source.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka en tant que récepteur**  
Le diagramme suivant illustre l'intégration de Kinesis Data Streams Kinesis à Service géré pour Apache Flink pour Apache Flink, avec Kafka en tant que collecteur.

![\[Kafka en tant que récepteur.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/gsr-kafka-sink.png)


Pour intégrer Kafka (ou Amazon MSK) à Service géré pour Apache Flink pour Apache Flink, avec Kafka comme source ou Kafka comme collecteur, apportez les modifications de code ci-dessous. Ajoutez les blocs de code en gras à votre code respectif dans les sections analogues.

Si Kafka est la source, utilisez le code de désérialiseur (bloc 2). Si Kafka est le récepteur, utilisez le code de sérialiseur (bloc 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Intégration de Kinesis Data Streams à Apache Flink
<a name="schema-registry-integrations-integrate-kds"></a>

Vous pouvez utiliser Service géré pour Apache Flink pour Apache Flink, avec Data Streams comme source ou comme collecteur.

**Kinesis Data Streams en tant que source**  
Le diagramme suivant illustre l'intégration de Kinesis Data Streams Kinesis à Service géré pour Apache Flink pour Apache Flink, avec Kinesis Data Streams en tant que source.

![\[Kinesis Data Streams en tant que source.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams en tant que récepteur**  
Le diagramme suivant illustre l'intégration de Kinesis Data Streams Kinesis à Service géré pour Apache Flink pour Apache Flink, avec Kinesis Data Streams en tant que collecteur.

![\[Kinesis Data Streams en tant que récepteur.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/gsr-kinesis-sink.png)


Pour intégrer Kinesis Data Streams à Service géré pour Apache Flink pour Apache Flink, avec Kinesis Data Streams en tant que source ou Kinesis Data Streams en tant que collecteur, apportez les modifications de code ci-dessous. Ajoutez les blocs de code en gras à votre code respectif dans les sections analogues.

Si Kinesis Data Streams est la source, utilisez le code de désérialiseur (bloc 2). Si Kinesis Data Streams est le récepteur, utilisez le code de sérialiseur (bloc 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## Cas d'utilisation : intégration avec AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

[Pour utiliser une AWS Lambda fonction en tant que consommateur Apache Kafka/Amazon MSK et désérialiser les messages codés en AVRO à l'aide de AWS Glue Schema Registry, visitez la page MSK Labs.](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)

## Cas d'utilisation : AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

Les tables AWS Glue prennent en charge des schémas que vous pouvez spécifier manuellement ou par référence au registre de schémas AWS Glue. Le registre de schéma s'intègre au catalogue de données pour vous permettre d'utiliser en option des schémas stockés dans le registre de schémas lors de la création ou de la mise à jour de tables AWS Glue ou partitions dans le catalogue de données. Pour identifier une définition de schéma dans le registre de schémas, vous devez au minimum connaître l'ARN du schéma dont il fait partie. Une version de schéma d'un schéma, qui contient une définition de schéma, peut être référencée par son UUID ou son numéro de version. Il y a toujours une version de schéma, la « dernière » version, qui peut être recherchée sans connaître son numéro de version ou son UUID.

Lors de l'appel des opérations `CreateTable` ou `UpdateTable`, vous transmettrez une structure `TableInput` qui contient un `StorageDescriptor`, qui peut avoir une `SchemaReference` à un schéma existant dans le registre de schémas. De même, lorsque vous appelez le `GetTable` ou `GetPartition` APIs, la réponse peut contenir le schéma et le`SchemaReference`. Lorsqu'une table ou une partition a été créée à l'aide de références de schéma, le catalogue de données tente d'extraire le schéma pour cette référence de schéma. Dans le cas où il ne parvient pas à trouver le schéma dans le registre de schémas, il renvoie un schéma vide dans la réponse `GetTable` ; sinon la réponse aura à la fois le schéma et la référence du schéma.

Vous pouvez aussi effectuer les actions depuis la console AWS Glue.

Pour effectuer ces opérations et créer, mettre à jour ou afficher les informations de schéma, vous devez accorder un rôle IAM à l'appelant qui fournit les autorisations pour l'API `GetSchemaVersion`.

### Ajout d'une table ou mise à jour du schéma pour une table
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

L'ajout d'une nouvelle table à partir d'un schéma existant lie la table à une version de schéma spécifique. Une fois que les nouvelles versions de schéma sont enregistrées, vous pouvez mettre à jour cette définition de table à partir de la page Affichage de la table dans la console AWS Glue ou à l'aide de l'API [UpdateTable action (Python : update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable).

#### Ajout d'une table à partir d'un schéma existant
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

Vous pouvez créer une table AWS Glue à partir d'une version de schéma dans le registre à l'aide de la console AWS Glue ou de l'API `CreateTable`.

**API AWS Glue**  
Lors de l'appel de l'API `CreateTable`, vous transmettrez une `TableInput` qui contient un `StorageDescriptor`, qui a une `SchemaReference` à un schéma existant dans le registre de schémas.

**Console AWS Glue **  
Pour créer une table à partir de la console AWS Glue :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Tables**.

1. Dans le menu **Add tables (Ajouter des tables)**, choisissez **Add table from existing schema (Ajouter une table à partir d'un schéma existant)**.

1. Configurez les propriétés de la table et le magasin de données selon le Guide du développeur AWS Glue.

1. Sur la page **Choisir un schéma de Glue**, sélectionnez le **registre** où réside le schéma.

1. Choisissez le **nom du schéma** et sélectionnez la **version** du schéma à appliquer.

1. Passez en revue la prévisualisation du schéma, puis choisissez **Next (Suivant)**.

1. Vérifiez et créez la table.

Le schéma et la version appliqués à la table s'affichent dans la colonne **Glue schema (Schéma Glue)** dans la liste des tables. Vous pouvez afficher le tableau pour voir plus de détails.

#### Mise à jour du schéma pour une table
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

Lorsqu'une nouvelle version de schéma devient disponible, vous pouvez mettre à jour le schéma d'une table à l'aide de l'API [UpdateTable action (Python : update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) ou de la console AWS Glue. 

**Important**  
Lors de la mise à jour du schéma d'une table existante dotée d'un schéma AWS Glue spécifié manuellement, le nouveau schéma référencé dans le registre de schémas peut être incompatible. Cela peut entraîner l'échec de vos tâches.

**API AWS Glue**  
Lors de l'appel de l'API `UpdateTable`, vous transmettrez une `TableInput` qui contient un `StorageDescriptor`, qui a une `SchemaReference` à un schéma existant dans le registre de schémas.

**Console AWS Glue **  
Pour mettre à jour le schéma pour une table à partir de la console AWS Glue :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Dans le panneau de navigation, sous **Data catalog (Catalogue de données)**, choisissez **Tables**.

1. Affichez la table à partir de la liste des tables.

1. Cliquez sur **Update schema (Mettre à jour le schéma)** dans la zone qui vous informe d'une nouvelle version.

1. Examinez les différences entre le schéma actuel et le nouveau.

1. Choisissez **Show all schema differences (Afficher toutes les différences du schéma)** pour plus de détails.

1. Choisissez **Save table (Enregistrer la table)** pour accepter la nouvelle version.

## Cas d'utilisation : streaming AWS Glue
<a name="schema-registry-integrations-aws-glue-streaming"></a>

Le streaming AWS Glue consomme des données provenant de sources de streaming et effectue des opérations d'ETL avant d'écrire dans un récepteur de sortie. La source de streaming d'entrée peut être spécifiée à l'aide d'une table de données ou directement en spécifiant la configuration de la source.

Le streaming AWS Glue prend en charge une table de catalogue de données pour la source de streaming créée avec le schéma présent dans le registre de schémas AWS Glue. Vous pouvez créer un schéma dans le registre de schémas AWS Glue et créer une table AWS Glue avec une source de streaming utilisant ce schéma. Cette table AWS Glue peut être utilisée comme entrée dans une tâche de streaming AWS Glue pour désérialiser les données dans le flux d'entrée.

Remarque : lorsque le schéma du registre de schémas AWS Glue change, vous devez redémarrer la tâche de streaming AWS Glue afin de refléter les modifications dans le schéma.

## Cas d'utilisation : Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

L'API Apache Kafka Streams est une bibliothèque client pour le traitement et l'analyse des données stockées dans Apache Kafka. Cette section décrit l'intégration d'Apache Kafka Streams au registre de schémas AWS Glue, qui vous permet de gérer et d'appliquer des schémas sur vos applications de streaming de données. Pour de plus amples informations sur Apache Kafka Streams, veuillez consulter [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).

### Intégration aux SerDes bibliothèques
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

Il existe une classe `GlueSchemaRegistryKafkaStreamsSerde` avec laquelle vous pouvez configurer une application Streams.

#### Exemple de code d'application Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

Pour utiliser le registre de schémas AWS Glue dans une application Apache Kafka Streams :

1. Configurez l'application Kafka Streams.

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. Créez un flux à partir de la rubrique avro-input.

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. Traitez les enregistrements de données (l'exemple filtre les enregistrements dont la valeur de favorite\$1color (couleur favorite) est pink (rose) ou dont la valeur de amount (montant) est 15).

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. Réécrivez les résultats dans la rubrique avro-output.

   ```
   result.to("avro-output");
   ```

1. Démarrez l'application Apache Kafka Streams.

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### Résultats de l'implémentation
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

Ces résultats montrent le processus de filtrage des enregistrements qui ont été filtrés à l'étape 3 sous la forme d'une valeur favorite\$1color (couleur favorite) « pink » (rose) ou d'une valeur de « 15.0 ».

Enregistrements avant le filtrage :

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

Enregistrements après filtrage :

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### Cas d'utilisation : Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

L'intégration d'Apache Kafka Connect au registre de schémas AWS Glue vous permet d'obtenir des informations de schéma à partir des connecteurs. Les convertisseurs Apache Kafka spécifient le format des données dans Apache Kafka et comment les traduire en données Apache Kafka Connect. Chaque utilisateur d'Apache Kafka Connect devra configurer ces convertisseurs en fonction du format souhaité pour leurs données lorsqu'elles sont chargées depuis ou stockés dans Apache Kafka. De cette façon, vous pouvez définir vos propres convertisseurs pour traduire les données Apache Kafka Connect dans le type utilisé dans le registre de schémas AWS Glue (par exemple : Avro) et utiliser notre sérialiseur pour enregistrer son schéma et effectuer la sérialisation. Ensuite, les convertisseurs peuvent également utiliser notre désérialiseur pour désérialiser les données reçues d'Apache Kafka et les convertir en données Apache Kafka Connect. Un exemple de diagramme de flux de travail est présenté ci-dessous.

![\[Flux de travail Apache Kafka Connect.\]](http://docs.aws.amazon.com/fr_fr/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. Installez le projet `aws-glue-schema-registry` en clonant le [référentiel Github pour le registre de schémas AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. Si vous prévoyez d'utiliser Apache Kafka Connect en mode *autonome*, mettez à jour **connect-standalone.properties** à l'aide des instructions suivantes pour cette étape. Si vous prévoyez d'utiliser Apache Kafka Connect en mode *distribué*, mettez à jour le **connect-avro-distributedfichier .properties** en suivant les mêmes instructions.

   1. Ajoutez également ces propriétés au fichier de propriétés de connexion Apache Kafka :

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. Ajoutez la commande ci-dessous à la section **Mode de lancement** sous **kafka-run-class.sh** :

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. Ajoutez la commande ci-dessous à la section **Mode de lancement** sous **kafka-run-class.sh**

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   Elle doit ressembler à ce qui suit :

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. Si vous utilisez bash, exécutez les commandes ci-dessous pour configurer votre CLASSPATH dans votre bash\$1profile. Pour tout autre shell, mettez à jour l'environnement en conséquence.

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (Facultatif) Si vous souhaitez tester avec une source de fichier simple, clonez le connecteur de source de fichier.

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. Sous la configuration du connecteur source, modifiez le format de données sur Avro, le lecteur de fichiers sur `AvroFileReader` et mettez à jour un exemple d'objet Avro à partir du chemin d'accès du fichier à partir duquel vous lisez. Par exemple :

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. Installez le connecteur source.

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. Mettez à jour les propriétés du récepteur sous `<your Apache Kafka installation directory>/config/connect-file-sink.properties`, mettez à jour le nom de rubrique et le nom de fichier de sortie.

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Démarrez le connecteur source (dans cet exemple, il s'agit d'un connecteur source de fichier).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Exécutez le connecteur du récepteur (dans cet exemple, il s'agit d'un connecteur de récepteur de fichiers).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Pour un exemple d'utilisation de Kafka Connect, regardez le script run-local-tests .sh situé dans le dossier integration-tests du [référentiel Github](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests) pour le registre des schémas. AWS Glue

# Migration d'un registre de schémas tiers vers le registre de schémas AWS Glue
<a name="schema-registry-integrations-migration"></a>

La migration d'un registre de schémas tiers vers le registre de schémas AWS Glue a une dépendance sur le registre de schéma tiers existant et actuel. S'il existe des enregistrements dans une rubrique Apache Kafka qui ont été envoyés à l'aide d'un registre de schémas tiers, les applications consommateur ont besoin du registre de schémas tiers pour désérialiser ces enregistrements. Le `AWSKafkaAvroDeserializer` permet de spécifier une classe de désérialiseur secondaire qui pointe vers le désérialiseur tiers et qui est utilisée pour désérialiser ces enregistrements.

Il existe deux critères pour le retrait d'un schéma tiers. Tout d'abord, le retrait ne peut avoir lieu qu'après que les enregistrements dans les rubriques Apache Kafka utilisant le registre de schéma tiers ne soient plus requis par et pour les applications consommateur. Deuxièmement, le retrait peut se produire suite au vieillissement des rubriques Apache Kafka, en fonction de la période de rétention spécifiée pour ces rubriques. Notez que si vous avez des rubriques qui ont une rétention infinie, vous pouvez toujours migrer vers le registre de schémas AWS Glue, mais vous ne serez pas en mesure de retirer le registre de schémas tiers. Comme solution de contournement, vous pouvez utiliser une application ou Mirror Maker 2 pour lire à partir de la rubrique actuelle et produire vers une nouvelle rubrique à l'aide du registre de schémas AWS Glue.

Pour migrer d'un registre de schémas tiers vers le registre de schémas AWS Glue :

1. Créez un registre dans le registre de schémas AWS Glue ou utilisez le registre par défaut.

1. Arrêtez l'application consommateur. Modifiez-le pour inclure le registre de schémas AWS Glue en tant que désérialiseur principal et le registre de schéma tiers en tant que secondaire. 
   + Définissez les propriétés de l'application consommateur. Dans cet exemple, la valeur secondary\$1deserializer est définie sur un autre désérialiseur. Le comportement est le suivant : l'application consommateur récupère les enregistrements auprès d'Amazon MSK et essaie d'abord d'utiliser le `AWSKafkaAvroDeserializer`. S'il n'est pas en mesure de lire l'octet magique qui contient l'ID de schéma Avro pour le registre de schémas AWS Glue, le `AWSKafkaAvroDeserializer` essaie ensuite d'utiliser la classe du désérialiseur fournie dans la valeure secondary\$1deserializer. Les propriétés spécifiques au désérialiseur secondaire doivent également être fournies dans les propriétés de l'application consommateur, telles que schema\$1registry\$1url\$1config et specific\$1avro\$1reader\$1config, comme indiqué ci-dessous.

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. Redémarrez l'application consommateur.

1. Arrêtez l'application producteur et pointez-la vers le registre de schémas AWS Glue.

   1. Définissez les propriétés du producteur. Dans cet exemple, le producteur utilisera le registre par défaut et enregistrera automatiquement les versions de schéma.

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (Facultatif) Déplacez manuellement les schémas et les versions de schéma existants du registre de schémas tiers actuel vers le registre de schémas AWS Glue, soit vers le registre par défaut dans le registre de schémas AWS Glue ou vers un registre spécifique autre que celui par défaut dans le registre de schémas AWS Glue. Cela peut être fait en exportant des schémas à partir de registres de schémas tiers au format JSON et en créant de nouveaux schémas dans le registre de AWS Glue schémas à l'aide du ou du AWS Management Console . AWS CLI

    Cette étape peut être importante si vous devez activer les contrôles de compatibilité avec les versions de schéma précédentes pour les versions de schéma nouvellement créées à l'aide du AWS CLI et AWS Management Console, ou lorsque les producteurs envoient des messages avec un nouveau schéma avec l'enregistrement automatique des versions de schéma activé.

1. Démarrez l'application producteur.