

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Guida introduttiva al registro degli schemi
<a name="schema-registry-gs"></a>

Le sezioni seguenti offrono una panoramica e illustrano la configurazione e l'uso del registro degli schemi. Per informazioni su concetti e componenti del registro degli schemi, consultare [Registro dello schema di AWS Glue](schema-registry.md).

**Topics**
+ [Installazione delle SerDe librerie](schema-registry-gs-serde.md)
+ [Integrazione con il registro degli schemi di AWS Glue](schema-registry-integrations.md)
+ [Migrazione da un registro degli schemi di terze parti al registro degli schemi di AWS Glue](schema-registry-integrations-migration.md)

# Installazione delle SerDe librerie
<a name="schema-registry-gs-serde"></a>

Le SerDe librerie forniscono un framework per la serializzazione e la deserializzazione dei dati. 

Installerai il serializzatore open source per le applicazioni che producono dati (collettivamente i "serializzatori"). Il serializzatore gestisce la serializzazione, la compressione e l'interazione con il registro degli schemi. Il serializzatore estrae automaticamente lo schema da un record in fase di scrittura in una destinazione compatibile con il registro degli schemi, ad esempio Amazon MSK. Allo stesso modo, installerai il deserializzatore open source sulle applicazioni che consumano dati.

# implementazione Java
<a name="schema-registry-gs-serde-java"></a>

**Nota**  
Prerequisiti: prima di completare i passaggi riportati di seguito, dovrai disporre di un cluster in esecuzione Amazon Streaming gestito per Apache Kafka (Amazon MSK) o Apache Kafka. I produttori e i consumer devono essere in esecuzione su Java 8 o versione successiva.

Per installare le librerie su produttori e consumer:

1. All'interno dei file pom.xml dei produttori e dei consumer, aggiungi questa dipendenza tramite il codice qui sotto:

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   In alternativa, è possibile clonare il [repository Github del registro degli schemi di AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

1. Imposta i tuoi produttori con le seguenti proprietà obbligatorie:

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   Se non esistono schemi, è necessario attivare la registrazione automatica (passaggio successivo). Se si dispone di uno schema da applicare, sostituire "my-schema" con il nome dello schema. Se la registrazione automatica dello schema è disattivata deve essere fornito anche "registry-name". Se lo schema viene creato sotto "default-registry", il nome del registro può essere omesso.

1. (Facoltativo) Impostare una di queste proprietà facoltative del produttore. Per descrizioni dettagliate delle proprietà, [consultate il ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   La registrazione automatica registra la versione dello schema nel registro di default ("default-registry"). Se nel passaggio precedente non è stato specificato un `SCHEMA_NAME`, il nome dell'argomento viene dedotto come `SCHEMA_NAME`. 

   Per ulteriori informazioni sulle modalità di compatibilità, consulta [Controllo delle versioni e compatibilità degli schemi](schema-registry.md#schema-registry-compatibility).

1. Imposta i tuoi consumer con le seguenti proprietà obbligatorie:

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Regione AWS
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (Facoltativo) Imposta queste proprietà facoltative del consumer. Per descrizioni dettagliate delle proprietà, [consultate il ReadMe file](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md).

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# Implementazione C\$1
<a name="schema-registry-gs-serde-csharp"></a>

**Nota**  
Prerequisiti: prima di completare i passaggi riportati di seguito, dovrai disporre di un cluster in esecuzione Amazon Streaming gestito per Apache Kafka (Amazon MSK) o Apache Kafka. I produttori e i consumatori devono essere in esecuzione su.NET 8.0 o versioni successive.

## Installazione
<a name="schema-registry-gs-serde-csharp-install"></a>

Per le applicazioni C\$1, installa il SerDe NuGet pacchetto AWS Glue Schema Registry utilizzando uno dei seguenti metodi:

**.NET CLI:**  
Usa il seguente comando per installare il pacchetto:

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

dove `<rid>` potrebbe essere`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` oppure `1.0.0-linux-arm64`

**PackageReference (nel tuo file.csproj):**  
Aggiungi quanto segue al tuo file di progetto:

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

dove `<rid>` potrebbe essere`1.0.0-linux-x64`, `1.0.0-linux-musl-x64` oppure `1.0.0-linux-arm64`

## Configurazione del file di configurazione
<a name="schema-registry-gs-serde-csharp-config"></a>

Crea un file delle proprietà di configurazione (ad esempio`gsr-config.properties`) con le impostazioni richieste:

**Configurazione minima:**  
Di seguito viene illustrato un esempio di configurazione minima:

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Utilizzo della libreria client C\$1 Glue Schema per Kafka SerDes
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**Esempio di utilizzo del serializzatore:**  
L'esempio seguente mostra come utilizzare il serializzatore:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**Esempio di utilizzo del deserializzatore:**  
L'esempio seguente mostra come utilizzare il deserializzatore:

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## Utilizzo della libreria client C\$1 Glue Schema con for KafkaFlow SerDes
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**Esempio di utilizzo del serializzatore:**  
L'esempio seguente mostra come configurare KafkaFlow con il serializzatore:

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**Esempio di utilizzo del deserializzatore:**  
L'esempio seguente mostra come configurare KafkaFlow con il deserializzatore:

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## Proprietà opzionali del produttore
<a name="schema-registry-gs-serde-csharp-optional"></a>

È possibile estendere il file di configurazione con proprietà opzionali aggiuntive:

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## Formati di dati supportati
<a name="schema-registry-gs-serde-supported-formats"></a>

Entrambe le implementazioni Java e C\$1 supportano gli stessi formati di dati:
+ *AVRO*: formato binario Apache Avro
+ *JSON: formato dello schema JSON*
+ *PROTOBUF*: formato Protocol Buffer

## Note
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ [Per iniziare a usare la libreria, visita https://www.nuget. org/packages/AWS.Colla. SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry)
+ Il codice sorgente è disponibile all'indirizzo: [https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# Creazione di un registro
<a name="schema-registry-gs3"></a>

È possibile utilizzare il registro predefinito o creare tutti i nuovi registri necessari utilizzando la console AWS Glue APIs oAWS Glue.

**AWS Glue APIs**  
È possibile utilizzare questi passaggi per eseguire questa operazione utilizzando il. AWS Glue APIs

Per utilizzare il AWS CLI registro degli AWS Glue schemi APIs, assicurati di aggiornare il tuo AWS CLI alla versione più recente.

 Per aggiungere un nuovo registro, utilizza l'API [CreateRegistry azione (Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry). Specifica `RegistryName` come nome del registro da creare, con una lunghezza massima di 255 caratteri e può contenere solo lettere, numeri, trattini, trattini bassi, simboli del dollaro o cancelletti. 

Specificare una `Description` come una stringa non superiore a 2048 byte di lunghezza, corrispondente al [modello di stringa su più righe dell'indirizzo URI](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns). 

Facoltativamente, puoi specificare uno o più `Tags` per il registro, come una matrice di mappe di coppie chiave-valore.

```
aws glue create-registry --registry-name registryName1 --description description
```

Al momento della creazione del registro, gli viene assegnato un Amazon Resource Name (ARN) che puoi visualizzare nel `RegistryArn` della risposta API. Dopo aver creato un registro, crea uno o più schemi per questo registro.

**Console AWS Glue**  
Per aggiungere un nuovo registro nella console AWS Glue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema registries (Registri degli schemi)**.

1. Scegli **Add registry (Aggiungi registro)**.

1. Inserisci un **Registry name (Nome registro)** per il registro, composto da lettere, numeri, trattini o trattini bassi. Questo nome non può essere modificato.

1. Inserisci una **Description (Descrizione)** (facoltativo) per il registro.

1. Facoltativamente, applica uno o più tag al registro. Seleziona **Add new tag (Aggiungi nuovo tag)** e immetti una **Tag key (Chiave di tag)** e facoltativamente un **Tag value (Valore di tag)**.

1. Scegli **Add registry (Aggiungi registro)**.

![\[Esempio di creazione di un registro.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/schema_reg_create_registry.png)


Al momento della creazione del registro, gli viene assegnato un Amazon Resource Name (ARN) che puoi visualizzare selezionando il registro dall'elenco in **Schema registries (Registri degli schemi)**. Dopo aver creato un registro, crea uno o più schemi per questo registro.

# Utilizzo di un record specifico (JAVA POJO) per JSON
<a name="schema-registry-gs-json-java-pojo"></a>

È possibile utilizzare un POJO (Plain Old Java Object) e passare l'oggetto come record. È simile alla nozione di un record specifico in AVRO. [mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema)Può generare uno schema JSON per il POJO passato. Questa libreria può anche inserire informazioni aggiuntive nello schema JSON.

La libreria del registro degli schemi di AWS Glue utilizza il campo "className" inserito nello schema per fornire un nome di classe completamente classificato. Il campo "className" viene utilizzato dal deserializzatore per eseguire la deserializzazione in un oggetto di quella classe.

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# Creazione di uno schema
<a name="schema-registry-gs4"></a>

È possibile creare uno schema utilizzando la AWS Glue APIs o la AWS Glue console. 

**AWS Glue APIs**  
È possibile utilizzare questi passaggi per eseguire questa operazione utilizzando il AWS Glue APIs.

Per aggiungere un nuovo schema, utilizza l'API [CreateSchema azione (Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema).

Specifica una struttura `RegistryId` per indicare un registro per lo schema. Oppure, ometti il `RegistryId` per utilizzare il registro di default.

Specifica un `SchemaName` composto da lettere, numeri, trattini o trattini bassi e `DataFormat` come **AVRO** o **JSON**. Una volta impostato su uno schema, `DataFormat` non è modificabile.

Specifica una modalità di `Compatibility`:
+ *Backward (consigliato)*: il consumer può leggere sia la versione attuale che quella precedente.
+ *Backward all*: il consumer può leggere la versione attuale e tutte quelle precedenti.
+ *Forward*: il consumer può leggere sia la versione attuale che quella successiva.
+ *Forward all*: il consumer può leggere sia la versione attuale che tutte quelle successive.
+ *Full*: combinazione di Backward e Forward.
+ *Full all*: combinazione di Backward all e Forward all.
+ *None*: non vengono eseguiti controlli di compatibilità.
+ *Disabled*: impedisce il controllo delle versioni per questo schema.

Facoltativamente, specifica i `Tags` per lo schema. 

Specifica un valore di `SchemaDefinition` per definire lo schema in formato dati Avro, JSON o Protobuf. Consulta questi esempi.

Per il formato dati Avro:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

Per il formato dati JSON:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Per il formato dati Protobuf:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**Console AWS Glue**  
Per aggiungere un nuovo schema utilizzando la console AWS Glue:

1. Accedi alla console di AWS gestione e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema (Schemi)**.

1. Seleziona **Aggiungi schema (Aggiungi schema)**.

1. Inserisci uno **Schema name (Nome schema)** composto da lettere, numeri, trattini, trattini bassi, simboli di dollaro o cancelletti. Questo nome non può essere modificato.

1. Seleziona il **registro** in cui lo schema verrà archiviato dal menu a discesa. Il registro padre non può essere modificato dopo la creazione.

1. Lascia **Data format (Formato dei dati)** come *Apache Avro* o *JSON*. Questo formato si applica a tutte le versioni di questo schema.

1. Scegli una **Compatibility mode (Modalità Compatibilità)**.
   + *Backward (consigliato)*: il ricevitore può leggere sia la versione attuale che quella precedente.
   + *Backward all*: il ricevitore può leggere la versione attuale e tutte quelle precedenti.
   + *Forward*: il mittente può scrivere sia la versione attuale che quelle precedenti.
   + *Forward All*: il mittente può scrivere sia la versione attuale che tutte quelle precedenti.
   + *Full*: combinazione di Backward e Forward.
   + *Full All*: combinazione di Backward All e Forward All.
   + *None*: non vengono eseguiti controlli di compatibilità.
   + *Disabled*: impedisce il controllo delle versioni per questo schema.

1. Immetti un a **Description (Descrizione)** facoltativa per il registro con un massimo di 250 caratteri.  
![\[Esempio di creazione di uno schema.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/schema_reg_create_schema.png)

1. Facoltativamente, applica uno o più tag allo schema. Seleziona **Add new tag (Aggiungi nuovo tag)** e immetti una **Tag key (Chiave di tag)** e facoltativamente un **Tag value (Valore di tag)**.

1. Immetti o incolla lo schema iniziale nella casella **First schema version (Prima versione dello schema)**.

   Per il formato Avro, consulta [Utilizzare il formato di dati Avro](#schema-registry-avro)

   Per il formato JSON, consulta [Utilizzare il formato di dati JSON](#schema-registry-json)

1. Facoltativamente, scegli **Add metadata (Aggiungi metadata)** per aggiungere metadati di versione per annotare o classificare la versione dello schema.

1. Scegli **Create schema and version (Crea schema e versione)**.

![\[Esempio di creazione di uno schema.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/schema_reg_create_schema2.png)


Lo schema viene creato e viene visualizzato nell'elenco sotto **Schemas (Schemi)**.

## Utilizzare il formato di dati Avro
<a name="schema-registry-avro"></a>

Avro fornisce servizi di serializzazione dei dati e scambio di dati. Avro memorizza la definizione dei dati in formato JSON semplificando la lettura e l'interpretazione. I dati stessi sono memorizzati in formato binario.

Per informazioni sulla definizione di uno schema Apache Avro, consulta la [specifica di Apache Avro](http://avro.apache.org/docs/current/spec.html).

## Utilizzare il formato di dati JSON
<a name="schema-registry-json"></a>

I dati possono essere serializzati con il formato JSON. Il [formato di schemi JSON](https://json-schema.org/) definisce lo standard per il formato di schemi JSON.

# Aggiornamento di uno schema o di un registro
<a name="schema-registry-gs5"></a>

Una volta creati, è possibile modificare gli schemi, le versioni degli schemi o il registro.

## Aggiornamento di un registro
<a name="schema-registry-gs5a"></a>

È possibile aggiornare un registro utilizzando AWS Glue APIs o la AWS Glue console. Il nome di un registro esistente non può essere modificato. È possibile modificare la descrizione di un registro.

**AWS Glue APIs**  
Per aggiornare un registro esistente, utilizza l'API [UpdateRegistry azione (Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry).

Specifica una struttura `RegistryId` per indicare il registro da aggiornare. Passa una `Description` per modificare la descrizione di un registro.

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**Console AWS Glue**  
Per aggiornare un registro utilizzando la console AWS Glue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema registries (Registri degli schemi)**.

1. Scegli un registro dall'elenco dei registri selezionando la relativa casella.

1. Dal menu **Action (Operazioni)**, seleziona **Edit registry (Modifica registro)**.

# Aggiornamento di uno schema
<a name="schema-registry-gs5b"></a>

È possibile aggiornare la descrizione o l'impostazione di compatibilità per uno schema.

Per aggiornare uno schema esistente, utilizza l'API [UpdateSchema azione (Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema).

Specifica una struttura `SchemaId` per indicare lo schema da aggiornare. Deve essere fornito `VersionNumber` o `Compatibility`.

Esempio di codice 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# Aggiunta di una versione dello schema
<a name="schema-registry-gs5c"></a>

Quando si aggiunge una versione dello schema, è necessario confrontare le versioni per assicurarsi che il nuovo schema venga accettato.

Per aggiungere una nuova versione a uno schema esistente, utilizza l'API [RegisterSchemaVersion azione (Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion).

Specifica una struttura `SchemaId` per indicare lo schema per il quale si desidera aggiungere una versione e un valore di `SchemaDefinition` per definire lo schema.

Esempio di codice 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema (Schemi)**.

1. Scegli lo schema dall'elenco degli schemi selezionando la relativa casella.

1. Seleziona uno o più schemi dall'elenco selezionando le caselle.

1. Nel menu **Action (Operazioni)**, seleziona **Register new version (Registra nuova versione)**.

1. Nella casella **New version (Nuova versione)**, immetti o incolla il nuovo schema.

1. Seleziona **Compare with previous version (Confronta con la versione precedente)** per visualizzare le differenze con la versione precedente dello schema.

1. Facoltativamente, scegli **Add metadata (Aggiungi metadata)** per aggiungere metadati di versione per annotare o classificare la versione dello schema. Inserisci **Key (Chiave)** e facoltativamente **Value (Valore)**.

1. Scegli **Register version (Registra versione)**.

![\[Aggiunta di una versione dello schema.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/schema_reg_add_schema_version.png)


La versione degli schemi viene visualizzata nell'elenco delle versioni. Se la versione ha modificato la modalità di compatibilità, la versione verrà contrassegnata come checkpoint.

## Esempio di confronto tra le versioni di uno schema
<a name="schema-registry-gs5c1"></a>

Selezionando **Compare with previous version (Confronta con la versione precedente)**, le versioni precedenti e quelle nuove verranno mostrate insieme. Le informazioni modificate saranno evidenziate come segue:
+ *Giallo*: indica le informazioni modificate.
+ *Verde*: indica il contenuto aggiunto nella versione più recente.
+ *Rosso*: indica il contenuto rimosso nella versione più recente.

È possibile eseguire il confronto anche con le versioni precedenti.

![\[Esempio di confronto tra le versioni di uno schema.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/schema_reg_version_comparison.png)


# Eliminazione di uno schema o di un registro
<a name="schema-registry-gs7"></a>

L'eliminazione di uno schema, di una versione dello schema o di un registro è un'azione permanente che non può essere annullata.

## Eliminazione di uno schema
<a name="schema-registry-gs7a"></a>

Potresti voler eliminare uno schema quando non verrà più utilizzato all'interno di un registro, utilizzando o l'[DeleteSchema azione (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema)API. Console di gestione AWS

L'eliminazione di uno o più schemi è un'azione permanente che non può essere annullata. Accertati che lo schema o gli schemi non siano più necessari.

Per eliminare uno schema dal registro, chiama l'API [DeleteSchema azione (Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema), specificando la struttura `SchemaId` per identificare lo schema.

Ad esempio:

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**Console AWS Glue**  
Per eliminare uno schema dalla console AWS Glue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema registries (Registri degli schemi)**.

1. Scegli il registro che contiene lo schema dall'elenco dei registri.

1. Seleziona uno o più schemi dall'elenco selezionando le caselle.

1. Dal menu **Action (Operazioni)**, scegli **Delete schema (Elimina schema)**.

1. Inserisci il testo **Delete** nel campo per confermare l'eliminazione.

1. Seleziona **Delete (Elimina)**.

Gli schemi specificati vengono eliminati dal registro.

## Eliminazione di una versione dello schema
<a name="schema-registry-gs7b"></a>

Man mano che gli schemi si accumulano nel registro, potresti voler eliminare le versioni indesiderate dello schema utilizzando Console di gestione AWS, o l'[DeleteSchemaVersions azione (Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions)API. L'eliminazione di una o più versioni degli schemi è un'azione permanente che non può essere annullata. Accertati che le versioni degli schemi non siano più necessarie.

Durante l'eliminazione delle versioni degli schemi, tieni presente i seguenti vincoli:
+ Non è possibile eliminare una versione con segno di spunta.
+ L'intervallo di versioni contigue non può essere superiore a 25.
+ La versione più recente non deve trovarsi nello stato in sospeso.

Specifica la struttura `SchemaId` per identificare lo schema e specifica `Versions` come intervallo di versioni da eliminare. Per ulteriori informazioni sulla specifica di una versione o un intervallo di versioni, consulta [DeleteRegistry azione (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Le versioni degli schemi specificate vengono eliminate dal registro.

Chiamare l'API [ListSchemaVersions azione (Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) dopo questa chiamata elencherà lo stato delle versioni eliminate.

Esempio:

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema registries (Registri degli schemi)**.

1. Scegli il registro che contiene lo schema dall'elenco dei registri.

1. Seleziona uno o più schemi dall'elenco selezionando le caselle.

1. Dal menu **Action (Operazioni)**, scegli **Delete schema (Elimina schema)**.

1. Inserisci il testo **Delete** nel campo per confermare l'eliminazione.

1. Seleziona **Delete (Elimina)**.

Le versioni degli schemi specificate vengono eliminate dal registro.

# Eliminazione di un registro
<a name="schema-registry-gs7c"></a>

È possibile eliminare un registro quando gli schemi in esso contenuti non devono più essere organizzati in tale registro. Sarà necessario riassegnare tali schemi a un altro registro.

L'eliminazione di uno o più registri è un'azione permanente che non può essere annullata. Accertati che il registro o i registri non siano più necessari.

Il registro predefinito può essere eliminato utilizzando AWS CLI.

**API AWS Glue**  
Per eliminare l'intero registro, inclusi gli schemi e tutte le relative versioni, chiama l'API [DeleteRegistry azione (Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry). Specifica una struttura `RegistryId` per identificare lo schema.

Ad esempio:

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

Per ottenere lo stato dell'operazione di eliminazione, è possibile chiamare l'API `GetRegistry` dopo la chiamata asincrona.

**Console AWS Glue**  
Per eliminare un registro dalla console AWS Glue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Schema registries (Registri degli schemi)**.

1. Scegli un registro dall'elenco selezionando una casella.

1. Dal menu **Action (Operazioni)**, seleziona **Delete registry (Elimina registro)**.

1. Inserisci il testo **Delete** nel campo per confermare l'eliminazione.

1. Seleziona **Delete (Elimina)**.

I registri selezionati vengono eliminati da AWS Glue.

## Esempi di IAM per i serializzatori
<a name="schema-registry-gs1"></a>

**Nota**  
AWS le politiche gestite concedono le autorizzazioni necessarie per i casi d'uso comuni. Per informazioni sull'utilizzo delle policy gestite per gestire il registro degli schemi, consulta [AWS politiche gestite (predefinite) per AWS Glue](security-iam-awsmanpol.md#access-policy-examples-aws-managed). 

Per i serializzatori, è necessario creare una policy minima simile a quella riportata di seguito per avere la possibilità di trovare lo `schemaVersionId` per una determinata definizione dello schema. Nota, per leggere gli schemi nel registro è necessario disporre delle autorizzazioni di lettura. È possibile limitare i registri che possono essere letti utilizzando la clausola `Resource`.

Esempio di codice 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

Inoltre, è possibile consentire ai produttori di creare nuovi schemi e versioni includendo i seguenti metodi aggiuntivi. Nota, dovresti essere in grado di ispezionare il registro per verificare gli schemi add/remove/evolve al suo interno. È possibile limitare i registri che possono essere ispezionati utilizzando la clausola `Resource`.

Esempio di codice 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## Esempi di IAM per i deserializzatori
<a name="schema-registry-gs1b"></a>

Per i deserializzatori (lato consumer), è necessario creare una policy simile a quella riportata di seguito per consentire al deserializzatore di recuperare lo schema dal registro degli schemi per la deserializzazione. Nota, devi essere in grado di ispezionare il registro per recuperare gli schemi al suo interno.

Esempio di codice 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## Connettività privata tramite AWS PrivateLink
<a name="schema-registry-gs-private"></a>

Puoi utilizzarlo AWS PrivateLink per connettere il VPC del tuo produttore di dati AWS Glue definendo un'interfaccia per l'endpoint VPC. AWS Glue Quando utilizzi un endpoint VPC di interfaccia, la comunicazione tra il VPC e AWS Glue avviene completamente all'interno della rete AWS . Per ulteriori informazioni, consulta la pagina relativa all'[utilizzo di AWS Glue con endpoint VPC](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html).

# Accesso ai CloudWatch parametri di Amazon
<a name="schema-registry-gs-monitoring"></a>

 CloudWatch I parametri di Amazon sono disponibili come parte CloudWatch del piano gratuito. Puoi accedere a queste metriche nella CloudWatch console. Le metriche a livello di API includono CreateSchema (Successo e latenza), (Successo e latenza) GetSchemaByDefinition, (Successo e latenza), GetSchemaVersion (Successo e latenza), RegisterSchemaVersion (Successo e latenza). PutSchemaVersionMetadata Le metriche a livello di risorsa includono Registry. ThrottledByLimit, SchemaVersion. ThrottledByLimit, SchemaVersion .Taglia.

# CloudFormation Modello di esempio per il registro degli schemi
<a name="schema-registry-integrations-cfn"></a>

Di seguito è riportato un modello di esempio per la creazione di risorse del registro degli schemi in CloudFormation. Per creare questo stack nell'account, copia il modello sopra in un file `SampleTemplate.yaml` ed esegui il comando seguente:

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

Questo esempio usa `AWS::Glue::Registry` per creare un registro, `AWS::Glue::Schema` per creare uno schema, `AWS::Glue::SchemaVersion`per creare una versione dello schema e `AWS::Glue::SchemaVersionMetadata` per popolare i metadati della versione dello schema. 

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# Integrazione con il registro degli schemi di AWS Glue
<a name="schema-registry-integrations"></a>

Queste sezioni descrivono le integrazioni con il registro degli schemi di AWS Glue. Gli esempi riportati in questa sezione mostrano uno schema con formato di dati AVRO. [Per altri esempi, inclusi schemi con formato di dati JSON, consulta i test di integrazione e ReadMe le informazioni nel repository open source di Schema Registry. AWS Glue](https://github.com/awslabs/aws-glue-schema-registry)

**Topics**
+ [Caso d'uso: connessione del registro degli schemi ad Amazon MSK o Apache Kafka](#schema-registry-integrations-amazon-msk)
+ [Caso d'uso: integrazione del flusso di dati Amazon Kinesis con il registro degli schemi di AWS Glue](#schema-registry-integrations-kds)
+ [Caso d'uso: Servizio gestito da Amazon per Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [Caso d'uso: integrazione con AWS Lambda](#schema-registry-integrations-aws-lambda)
+ [Caso d'uso: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [Caso d'uso: streaming AWS Glue](#schema-registry-integrations-aws-glue-streaming)
+ [Caso d'uso: flussi Apache Kafka](#schema-registry-integrations-apache-kafka-streams)

## Caso d'uso: connessione del registro degli schemi ad Amazon MSK o Apache Kafka
<a name="schema-registry-integrations-amazon-msk"></a>

Supponiamo che tu stia scrivendo dati su un argomento Apache Kafka e che possa seguire questi passaggi per iniziare.

1. Crea un cluster Amazon Streaming gestito per Apache Kafka (Amazon MSK) o Apache Kafka con almeno un argomento. Se si crea un cluster Amazon MSK, è possibile utilizzare il Console di gestione AWS. Segui queste istruzioni: [Nozioni di base per l'uso di Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) nella *Guida per gli sviluppatori di Amazon Managed Streaming for Apache Kafka*.

1. Segui il passaggio [Installazione delle SerDe librerie](schema-registry-gs-serde.md) sopra.

1. Per creare registri dello schema, schemi o versioni, segui le istruzioni riportate nella sezione [Guida introduttiva al registro degli schemi](schema-registry-gs.md) di questo documento.

1. Inizia i tuoi produttori e consumatori a utilizzare lo Schema Registry per scrivere e leggere record to/from sull'argomento Amazon MSK o Apache Kafka. È possibile trovare esempi di codice per produttori e consumatori nel [ ReadMe file delle librerie Serde](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md). La libreria del registro degli schemi del produttore serializzerà automaticamente il record e aggiungerà un ID della versione dello schema al record.

1. Se lo schema di questo record è stato inserito o se la registrazione automatica è attivata, lo schema risulterà registrato nel registro degli schemi.

1. Il consumer che legge dall'argomento Amazon MSK o Apache Kafka, utilizzando la libreria del registro degli schemi di AWS Glue, cercherà automaticamente lo schema dal registro degli schemi.

## Caso d'uso: integrazione del flusso di dati Amazon Kinesis con il registro degli schemi di AWS Glue
<a name="schema-registry-integrations-kds"></a>

Per questa integrazione è necessario disporre di un flusso dei dati Amazon Kinesis. Per ulteriori informazioni, consulta [Nozioni di base su Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.

Esistono due modi per interagire con i dati in un flusso dei dati Kinesis.
+ Tramite le librerie Kinesis Producer Library (KPL) e Kinesis Client Library (KCL) in Java. Il supporto multilingue non viene fornito.
+ Tramite `PutRecords``PutRecord`, e `GetRecords` Kinesis APIs Data Streams disponibile in. AWS SDK per Java

Se attualmente utilizzi le KPL/KCL librerie, ti consigliamo di continuare a utilizzare quel metodo. Come mostrato negli esempi, esistono versioni KCL e KPL aggiornate con il registro degli schemi integrato. Altrimenti, puoi usare il codice di esempio per sfruttare lo AWS Glue Schema Registry se usi direttamente il KDS. APIs 

L'integrazione del registro degli schemi è disponibile solo con KPL v0.14.2 o versioni successive e con KCL v2.3 o versioni successive. L'integrazione del registro degli schemi con il formato di dati JSON è disponibile solo con KPL v0.14.8 o versioni successive e con KCL v2.3.6 o versioni successive.

### Interazione con i dati utilizzando Kinesis SDK V2
<a name="schema-registry-integrations-kds-sdk-v2"></a>

Questa sezione descrive l'interazione con Kinesis utilizzando Kinesis SDK V2

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### Interazione con i dati utilizzando le librerie KPL/KCL
<a name="schema-registry-integrations-kds-libraries"></a>

Questa sezione descrive l'integrazione di Kinesis Data Streams con Schema Registry utilizzando le librerie. KPL/KCL Per ulteriori informazioni sull'utilizzo di KPL/KCL, consulta [Sviluppo di produttori utilizzando Amazon Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.

#### Impostazione del registro degli schemi in KPL
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. Definisci la definizione dello schema per i dati, il formato dei dati e il nome dello schema creati nel registro degli schemi di AWS Glue.

1. Facoltativamente, puoi configurare l'oggetto `GlueSchemaRegistryConfiguration`.

1. Trasferisci l'oggetto dello schema a `addUserRecord API`.

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Impostazione della libreria client di Kinesis
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Svilupperai un consumer Kinesis Client Library in Java. Per ulteriori informazioni su, consulta [Sviluppo di app Consumer Kinesis Client Library in Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.

1. Crea un'istanza di `GlueSchemaRegistryDeserializer` passando un oggetto `GlueSchemaRegistryConfiguration`.

1. Passa `GlueSchemaRegistryDeserializer` a `retrievalConfig.glueSchemaRegistryDeserializer`.

1. Accedi allo schema dei messaggi in arrivo chiamando `kinesisClientRecord.getSchema()`.

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Interazione con i dati utilizzando Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis"></a>

Questa sezione descrive l'integrazione di Kinesis Data Streams con Schema Registry utilizzando Kinesis Data Streams. APIs

1. Aggiorna queste dipendenze di Maven:

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. Nel produttore, aggiungi le informazioni sull'intestazione dello schema utilizzando il l'API `PutRecords` o `PutRecord` in Kinesis Data Streams.

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. Nel produttore, utilizza l'API `PutRecords` o `PutRecord` per inserire il record nel flusso dei dati.

1. Nel consumer, rimuovi il record dello schema dall'intestazione e serializza un record dello schema Avro.

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Interazione con i dati utilizzando Kinesis Data Streams APIs
<a name="schema-registry-integrations-kds-apis-reference"></a>

Di seguito è riportato un codice di esempio per l'utilizzo di and. `PutRecords` `GetRecords` APIs

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## Caso d'uso: Servizio gestito da Amazon per Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink è un diffuso framework open source e motore di elaborazione distribuito per calcoli con stato su flussi dei dati illimitati e limitati. Amazon Managed Service per Apache Flink è un AWS servizio completamente gestito che consente di creare e gestire applicazioni Apache Flink per elaborare dati di streaming.

Apache Flink open source fornisce una serie di origini e sink. Ad esempio, le origini dati predefinite includono la lettura da file, directory e socket e l'inserimento di dati da raccolte e iteratori. I DataStream connettori Apache Flink forniscono codice per consentire ad Apache Flink di interfacciarsi con vari sistemi di terze parti, come Apache Kafka o Kinesis, come sorgenti. and/or 

Per ulteriori informazioni, consulta la [Guida per sviluppatori di Amazon Kinesis Data Analytics](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html).

### Connettore Apache Flink Kafka
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink fornisce un connettore per flusso dei dati Apache Kafka per la lettura e la scrittura di dati su argomenti Kafka con garanzie exactly-once. Il consumer Kafka di Flink, `FlinkKafkaConsumer`, fornisce l'accesso alla lettura da uno o più argomenti di Kafka. Il produttore Kafka di Apache Flink, `FlinkKafkaProducer`, consente di scrivere un flusso di record su uno o più argomenti Kafka. Per ulteriori informazioni, consulta [Apache Kafka Connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html).

### Connettore di flussi Apache Flink Kinesis
<a name="schema-registry-integrations-kinesis-connector"></a>

Il connettore del flusso dei dati Kinesis consente di accedere ai Amazon Kinesis Data Streams. `FlinkKinesisConsumer`Si tratta di una fonte di dati di streaming parallela che utilizza una sola volta per abbonarsi a più flussi Kinesis all'interno della stessa area di AWS servizio e può gestire in modo trasparente il ripartizionamento dei flussi mentre il processo è in esecuzione. Ogni sottoattività del consumer è responsabile del recupero dei record di dati da più partizioni Kinesis. Il numero di partizioni recuperate da ogni sottoattività cambierà man mano che le partizioni vengono chiuse e create da Kinesis. `FlinkKinesisProducer` utilizza Kinesis Producer Library (KPL) per inserire i dati da un flusso Apache Flink in un flusso Kinesis. Per ulteriori informazioni, consulta [Amazon Kinesis Streams Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html).

Per ulteriori informazioni, consulta il [AWS Gluerepository Github di schema ](https://github.com/awslabs/aws-glue-schema-registry).

### Integrazione con Apache Flink
<a name="schema-registry-integrations-apache-flink-integrate"></a>

La SerDes libreria fornita con Schema Registry si integra con Apache Flink. Per utilizzare Apache Flink, sarà necessario implementare le interfacce [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) e [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) denominate `GlueSchemaRegistryAvroSerializationSchema` e `GlueSchemaRegistryAvroDeserializationSchema`, che possono essere collegate ai connettori Apache Flink.

### Aggiunta di una dipendenza del registro degli schemi di AWS Glue nell'applicazione Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Per impostare le dipendenze di integrazione sul registro degli schemi di AWS Glue nell'applicazione Apache Flink:

1. Aggiungi la dipendenza al file `pom.xml`.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### Integrazione di Kafka o Amazon MSK con Apache Flink
<a name="schema-registry-integrations-kda-integrate-msk"></a>

È possibile usare Servizio gestito da Amazon per Apache Flink per Apache Flink con Kafka come origine o Kafka come sink.

**Kafka come fonte**  
Il diagramma seguente mostra l'integrazione di Flusso di dati Kinesis con Servizio gestito da Amazon per Apache Flink per Apache Flink, con Kafka come origine.

![\[Kafka come fonte.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/gsr-kafka-source.png)


**Kafka come sink**  
Il diagramma seguente mostra l'integrazione di Flusso di dati Kinesis con Servizio gestito da Amazon per Apache Flink per Apache Flink, con Kafka come sink.

![\[Kafka come sink.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/gsr-kafka-sink.png)


Per integrare Kafka (o Amazon MSK) con Servizio gestito da Amazon per Apache Flink per Apache Flink, con Kafka come origine o Kafka come sink, apporta le modifiche al codice riportate di seguito. Aggiungi i blocchi di codice in grassetto al codice corrispondente nelle sezioni analoghe.

Se Kafka è la fonte, utilizza il codice deserializzatore (blocco 2). Se Kafka è il sink, utilizza il codice serializzatore (blocco 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Integrazione di Kinesis Data Streams con Apache Flink
<a name="schema-registry-integrations-integrate-kds"></a>

È possibile utilizzare Servizio gestito da Amazon per Apache Flink per Apache Flink con Flusso di dati Kinesis come origine o sink.

**Kinesis Data Streams come fonte**  
Il diagramma seguente mostra l'integrazione di Flusso di dati Kinesis con Servizio gestito per Apache Flink per Apache Flink, con Flusso di dati Kinesis come origine.

![\[Kinesis Data Streams come fonte.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/gsr-kinesis-source.png)


**Kinesis Data Streams come sink**  
Il diagramma seguente mostra l'integrazione di Flusso di dati Kinesis con Servizio gestito per Apache Flink per Apache Flink, con Flusso di dati Kinesis come sink.

![\[Kinesis Data Streams come sink.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/gsr-kinesis-sink.png)


Per integrare Flusso di dati Kinesis con Servizio gestito per Apache Flink per Apache Flink, con Flusso di dati Kinesis come origine o Flusso di dati Kinesis come sink, apporta le modifiche al codice riportate di seguito. Aggiungi i blocchi di codice in grassetto al codice corrispondente nelle sezioni analoghe.

Se Kinesis Data Streams è l'origine, utilizza il codice deserializzatore (blocco 2). Se Kinesis Data Streams il sink, utilizza il codice serializzatore (blocco 3).

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## Caso d'uso: integrazione con AWS Lambda
<a name="schema-registry-integrations-aws-lambda"></a>

[Per utilizzare una AWS Lambda funzione come consumatore Apache Kafka/Amazon MSK e deserializzare i messaggi con codifica AVRO utilizzando AWS Glue Schema Registry, visita la pagina MSK Labs.](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)

## Caso d'uso: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

Le tabelle AWS Glue supportano gli schemi che è possibile specificare manualmente o facendo riferimento al registro degli schemi di AWS Glue. Il registro degli schemi si integra con il catalogo dati per consentire l'utilizzo facoltativo degli schemi memorizzati nel registro degli schemi durante la creazione o l'aggiornamento di tabelle o partizioni AWS Glue nel catalogo dati. Per identificare una definizione dello schema nel registro degli schemi, è necessario conoscere almeno l'ARN dello schema di cui fa parte. Una versione di uno schema, che contiene una definizione dello schema, può essere referenziata dal relativo UUID o numero di versione. C'è sempre una versione dello schema, la versione "più recente", che può essere cercata senza conoscere il suo numero di versione o UUID.

Chiamando le operazioni `CreateTable` o `UpdateTable`, passerai una struttura `TableInput` che contiene un `StorageDescriptor`, che può avere un `SchemaReference` a uno schema esistente nel registro degli schemi. Allo stesso modo, quando si chiama `GetTable` o `GetPartition` APIs, la risposta può contenere lo schema e il`SchemaReference`. Quando una tabella o una partizione è stata creata utilizzando riferimenti allo schema, il catalogo dati tenterà di recuperare lo schema per questo riferimento. Nel caso in cui non sia in grado di trovare lo schema nel registro degli schemi, restituisce uno schema vuoto nella risposta `GetTable`; in caso contrario, la risposta conterrà sia lo schema che il riferimento allo schema.

Puoi eseguire le seguenti operazioni dalla console AWS Glue.

Per eseguire queste operazioni e creare, aggiornare o visualizzare le informazioni sullo schema, è necessario assegnare un ruolo IAM all'utente chiamante che fornisce le autorizzazioni per l'API `GetSchemaVersion`.

### Aggiunta di una tabella o aggiornamento dello schema di una tabella
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

L'aggiunta di una nuova tabella da uno schema esistente associa la tabella a una versione specifica dello schema. Una volta registrate le nuove versioni dello schema, è possibile aggiornare la definizione della tabella dalla pagina View table (Visualizza tabella) nella console AWS Glue o utilizzando l'API [UpdateTable azione (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable).

#### Aggiunta di una tabella da uno schema esistente
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

Puoi creare una tabella AWS Glue da una versione dello schema nel registro utilizzando la console AWS Glue o l'API `CreateTable`.

**API AWS Glue**  
Chiamando l'API `CreateTable`, passerai un `TableInput` che contiene una `StorageDescriptor`, che può avere un `SchemaReference` a uno schema esistente nel registro degli schemi.

**Console AWS Glue**  
Per creare una tabella dalla consoleAWS Glue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Tables (Tabelle)**.

1. Nel menu **Add Tables (Aggiungi tabelle)**, scegli **Add table from existing schema (Aggiungi tabella da schema esistente)**.

1. Configura le proprietà della tabella e il datastore come indicato nella Guida per gli sviluppatori di AWS Glue.

1. Nella pagina **Choose a Glue schema (Scegli uno schema di Glue)**, seleziona il **Registry (Registro)** in cui si trova lo schema.

1. Scegli **Schema name (Nome schema)** e seleziona la **Version (Versione)** dello schema da applicare.

1. Esamina l'anteprima dello schema e scegli **Next (Successivo)**.

1. Rivedi e crea la tabella.

Lo schema e la versione applicati alla tabella vengono visualizzati nella colonna **Glue schema (Schema di Glue)** nell'elenco delle tabelle. È possibile visualizzare la tabella per vedere ulteriori dettagli.

#### Aggiornamento dello schema per una tabella
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

Quando una nuova versione dello schema diventa disponibile, si consiglia di aggiornare lo schema di una tabella utilizzando l'API [UpdateTable azione (Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) o la console AWS Glue. 

**Importante**  
Aggiornando lo schema per una tabella esistente con uno schema AWS Glue specificato manualmente, il nuovo schema a cui si fa riferimento nel registro degli schemi potrebbe essere incompatibile. Questo potrebbe comportare la non riuscita dei processi.

**API AWS Glue**  
Chiamando l'API `UpdateTable`, passerai un `TableInput` che contiene una `StorageDescriptor`, che può avere un `SchemaReference` a uno schema esistente nel registro degli schemi.

**Console AWS Glue**  
Per aggiornare lo schema di una tabella dalla console AWS Glue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\).

1. Nel pannello di navigazione, in **Data catalog (Catalogo dati)**, seleziona **Tables (Tabelle)**.

1. Visualizza la tabella nell'elenco delle tabelle.

1. Fai clic su **Update schema (Aggiorna schema)** nella casella che ti informa di una nuova versione.

1. Esamina le differenze tra lo schema attuale e quello nuovo.

1. Scegli **Show all schema differences (Mostra tutte le differenze di schema)** per ulteriori dettagli.

1. Scegli **Save table (Salva tabella)** per accettare la nuova versione.

## Caso d'uso: streaming AWS Glue
<a name="schema-registry-integrations-aws-glue-streaming"></a>

Lo streaming AWS Glue consuma dati provenienti da fonti di streaming ed esegue operazioni ETL prima di scrivere su un sink di output. L'origine di streaming di input può essere specificata utilizzando una tabella dati o direttamente specificando la configurazione di origine.

Lo streaming AWS Glue supporta una tabella del catalogo dati per la sorgente di streaming creata con lo schema presente nel Registro degli schemi di AWS Glue. È possibile creare uno schema nel Registro degli schemi di AWS Glue e creare una tabella AWS Glue con una sorgente di streaming utilizzando questo schema. Questa tabella AWS Glue può essere utilizzata come input per un processo di streaming AWS Glue per la deserializzazione dei dati nel flusso di input.

Un punto da notare qui è quando lo schema nel Registro degli schemi AWS Glue cambia, è necessario riavviare il processo di streaming AWS Glue in modo da riflettere i cambiamenti nello schema.

## Caso d'uso: flussi Apache Kafka
<a name="schema-registry-integrations-apache-kafka-streams"></a>

L'API di Apache Kafka Streams è una libreria client per l'elaborazione e l'analisi dei dati memorizzati in Apache Kafka. Questa sezione descrive l'integrazione di Apache Kafka Streams con il registro degli schemi di AWS Glue, che consente di gestire e applicare gli schemi sulle applicazioni di streaming di dati. Per ulteriori informazioni su Apache Kafka Streams, consulta [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).

### Integrazione con le librerie SerDes
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

Esiste una classe di `GlueSchemaRegistryKafkaStreamsSerde` con cui è possibile configurare un'applicazione Streams.

#### Codice di esempio di applicazione di flussi Kafka
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

Per utilizzare il registro degli schemi di AWS Glue all'interno di un'applicazione Apache Kafka Streams:

1. Configura l'applicazione Kafka Streams.

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. Crea un flusso dall'argomento avro-input.

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. Elabora i record di dati (nell'esempio vengono filtrati i record il cui valore favorite\$1color è rosa o in cui il valore amount è 15).

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. Scrivi i risultati nell'argomento avro-output.

   ```
   result.to("avro-output");
   ```

1. Avvia l'applicazione Apache Kafka Streams.

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### Risultati dell'implementazione
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

Questi risultati mostrano il processo di filtraggio dei record che sono stati filtrati nel passaggio 3 in base a favorite\$1color come "rosa" o valore come "15.0".

Record prima del filtraggio:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

Record dopo il filtraggio:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### Caso d'uso: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

L'integrazione di Apache Kafka Connect con il registro degli schemi di AWS Glue consente di ottenere informazioni sullo schema dai connettori. I convertitori Apache Kafka specificano il formato dei dati all'interno di Apache Kafka e come tradurli in dati Apache Kafka Connect. Ogni utente di Apache Kafka Connect dovrà configurare questi convertitori in base al formato desiderato dei dati quando vengono caricati o memorizzati in Apache Kafka. In questo modo, è possibile definire i propri convertitori per tradurre i dati di Apache Kafka Connect nel tipo utilizzato nel registro degli schemi di AWS Glue (ad esempio: Avro) e utilizzare il nostro serializzatore per registrare il suo schema ed eseguire la serializzazione. I convertitori sono anche in grado di utilizzare il nostro deserializzatore per deserializzare i dati ricevuti da Apache Kafka e convertirli nuovamente in dati Apache Kafka Connect. Di seguito è riportato un diagramma di flusso di lavoro di esempio.

![\[Flusso di lavoro Apache Kafka Connect.\]](http://docs.aws.amazon.com/it_it/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. Installare del progetto `aws-glue-schema-registry` con la clonazione del [repository Github per il registro degli schemi di AWS Glue](https://github.com/awslabs/aws-glue-schema-registry).

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. Se hai intenzione di utilizzare Apache Kafka Connect in modalità *standalone*, aggiorna **connect-standalone.properties** seguendo le istruzioni di seguito per questo passaggio. Se prevedi di utilizzare Apache Kafka Connect *in* modalità distribuita, **connect-avro-distributedaggiorna** .properties seguendo le stesse istruzioni.

   1. Aggiungi queste proprietà anche al file delle proprietà di Apache Kafka connect:

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. **Aggiungi il comando seguente alla sezione **Launch mode** in .sh: kafka-run-class**

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. **Aggiungi il comando seguente alla sezione **Launch mode in** .sh kafka-run-class**

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   L'URL dovrebbe essere simile a questo:

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. Se usi bash, esegui i seguenti comandi per configurare il tuo CLASSPATH nel bash\$1profile. Per qualsiasi altra shell, aggiorna l'ambiente di conseguenza.

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (Facoltativo) Se vuoi eseguire il test con un'origine file semplice, clona il connettore dell'origine file.

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. Sotto la configurazione del connettore di origine, modifica il formato dei dati in Avro, il lettore di file in `AvroFileReader` e aggiorna un oggetto Avro di esempio dal percorso del file da cui stai leggendo. Ad esempio:

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. Installa il connettore di origine.

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. Aggiorna le proprietà del sink in `<your Apache Kafka installation directory>/config/connect-file-sink.properties`, aggiorna il nome dell'argomento e il nome del file in uscita.

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. Avvia il connettore di origine (in questo esempio si tratta di un connettore dell'origine file).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. Esegui il connettore sink (in questo esempio si tratta di un connettore sink di file).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Per un esempio di utilizzo di Kafka Connect, guarda lo run-local-tests script.sh nella cartella integration-tests nel repository [Github](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests) per lo Schema Registry. AWS Glue

# Migrazione da un registro degli schemi di terze parti al registro degli schemi di AWS Glue
<a name="schema-registry-integrations-migration"></a>

La migrazione da un registro degli schemi di terze parti al registro degli schemi di AWS Glue ha una dipendenza sull'attuale registro degli schemi di terze parti esistente. Se in un argomento Apache Kafka sono presenti record che sono stati inviati utilizzando un registro degli schemi di terze parti, i consumer hanno bisogno di questo registro per deserializzare tali record. `AWSKafkaAvroDeserializer` offre la possibilità di specificare una classe di deserializzatore secondario che punta al deserializzatore di terze parti e viene utilizzato per deserializzare questi record.

Esistono due criteri per il ritiro di uno schema di terze parti. Innanzitutto, il ritiro può avvenire solo dopo che i record negli argomenti Apache Kafka che utilizzano il registro degli schemi di terze parti non sono più richiesti da e per tutti i consumer. In secondo luogo, il ritiro può avvenire quando gli argomenti di Apache Kafka diventano datati, a seconda del periodo di conservazione specificato per tali argomenti. Tieni presente che se disponi di argomenti con tempo di conservazione infinito, è comunque possibile eseguire la migrazione al registro degli schemi di AWS Glue, ma non sarà possibile ritirare il registro degli schemi di terze parti. Come soluzione alternativa, è possibile utilizzare un'applicazione o Mirror Maker 2 per leggere l'argomento corrente e generarlo in un nuovo argomento nel registro degli schemi di AWS Glue.

Per eseguire la migrazione da un registro degli schemi di terze parti al registro degli schemi di AWS Glue:

1. Crea un registro nel registro degli schemi di AWS Glue o utilizza il registro di default.

1. Arresta il consumer. Modificalo per includere il registro degli schemi di AWS Glue come deserializzatore primario e il registro degli schemi di terze parti come secondario. 
   + Imposta le proprietà del consumer. In questo esempio, il secondary\$1deserializer è impostato su un deserializzatore diverso. Il comportamento è il seguente: il consumer recupera i record da Amazon MSK e tenta di utilizzare prima `AWSKafkaAvroDeserializer`. Se non è in grado di leggere il magic byte che contiene l'ID dello schema Avro per il registro degli schemi di AWS Glue, `AWSKafkaAvroDeserializer` tenta quindi di utilizzare la classe di deserializzatore fornita nel secondary\$1deserializer. Le proprietà specifiche del deserializzatore secondario devono anche essere fornite nelle proprietà del consumer, come schema\$1registry\$1url\$1config e specific\$1avro\$1reader\$1config, come illustrato di seguito.

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. Riavvia il consumer.

1. Arresta il produttore e puntalo al registro degli schemi di AWS Glue.

   1. Imposta le proprietà del produttore. In questo esempio, il produttore utilizzerà il registro di default ed eseguirà la registrazione automatica delle versioni degli schemi.

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (Facoltativo) Sposta manualmente gli schemi e le versioni degli schemi esistenti dall'attuale registro degli schemi di terze parti al registro degli schemi di AWS Glue, nel registro di default nel registro degli schemi di AWS Glue o in un registro specifico non predefinito nel registro degli schemi di AWS Glue. Ciò può essere fatto esportando schemi dai registri di schemi di terze parti in formato JSON e creando nuovi schemi nel AWS Glue registro degli schemi utilizzando o il. Console di gestione AWS AWS CLI

    Questo passaggio può essere importante se è necessario abilitare i controlli di compatibilità con le versioni precedenti dello schema per le versioni dello schema appena create utilizzando AWS CLI and the Console di gestione AWS, o quando i produttori inviano messaggi con un nuovo schema con la registrazione automatica delle versioni dello schema attivata.

1. Avvia il produttore.