

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# AWS Glue Connessioni in streaming
<a name="glue-streaming-connections"></a>

 Le seguenti sezioni forniscono informazioni su come utilizzare le connessioni in AWS Glue Streaming. 

**Topics**
+ [Utilizzo delle connessioni Kafka](#glue-streaming-kafka-connections)
+ [Utilizzo delle connessioni Kinesis](#glue-streaming-kinesis-connections)

## Utilizzo delle connessioni Kafka
<a name="glue-streaming-kafka-connections"></a>

È possibile usare una connessione Kafka per leggere e scrivere su flussi di dati Kafka utilizzando le informazioni archiviate in una tabella del Catalogo dati o fornendo informazioni per accedere direttamente al flusso di dati. La connessione supporta un cluster Kafka o un cluster Streaming gestito da Amazon per Apache Kafka. Puoi leggere le informazioni di Kafka in uno Spark DataFrame, quindi convertirle in un Glue. AWS DynamicFrame Puoi scrivere su Kafka DynamicFrames in formato JSON. Se si accede direttamente al flusso di dati, utilizzare queste opzioni per fornire le informazioni su come accedere al flusso di dati.

Se si utilizza `getCatalogSource` o `create_data_frame_from_catalog` per consumare i record da una sorgente di streaming Kafka oppure `getCatalogSink` o `write_dynamic_frame_from_catalog` per scrivere i record su Kafka, il processo avrà le informazioni sul database del Catalogo dati e sul nome della tabella, e potrà usarle per ottenere alcuni parametri di base per la lettura dall'origine di streaming Kafka. Se si utilizza `getSource`, `getCatalogSink`, `getSourceWithFormat`, `getSinkWithFormat`, `createDataFrameFromOptions`, `create_data_frame_from_options` o `write_dynamic_frame_from_catalog`, sarà necessario specificare questi parametri di base utilizzando le opzioni di connessione descritte qui.

È possibile specificare le opzioni di connessione per Kafka utilizzando gli argomenti per i metodi specificati nella classe `GlueContext` descritti di seguito.
+ Scala
  + `connectionOptions`: utilizza con `getSource`, `createDataFrameFromOptions` e `getSink` 
  + `additionalOptions`: utilizza con `getCatalogSource`, `getCatalogSink`
  + `options`: utilizza con `getSourceWithFormat`, `getSinkWithFormat`
+ Python
  + `connection_options`: utilizza con `create_data_frame_from_options`, `write_dynamic_frame_from_options`
  + `additional_options`: utilizza con `create_data_frame_from_catalog`, `write_dynamic_frame_from_catalog`
  + `options`: utilizza con `getSource`, `getSink`

Per osservazioni e restrizioni sui processi ETL dei flussi di dati, consultare la pagina [Streaming di note e restrizioni ETL](add-job-streaming.md#create-job-streaming-restrictions).

**Topics**
+ [Configurazione di Kafka](#glue-streaming-etl-connect-kafka-configure)
+ [Esempio: lettura di flussi da Kafka](#glue-streaming-etl-connect-kafka-read)
+ [Esempio: scrittura in flussi Kafka](#glue-streaming-etl-connect-kafka-write)
+ [Indicazioni di riferimento alle opzioni di connessione a Kafka](#glue-streaming-etl-connect-kafka)

### Configurazione di Kafka
<a name="glue-streaming-etl-connect-kafka-configure"></a>

Non ci sono AWS prerequisiti per la connessione agli stream di Kafka disponibili su Internet.

Puoi creare una connessione AWS Glue Kafka per gestire le tue credenziali di connessione. Per ulteriori informazioni, consulta [Creazione di una connessione AWS Glue per un flusso di dati Apache Kafka](add-job-streaming.md#create-conn-streaming). Nella configurazione del processo AWS Glue, fornisci *connectionName* una **connessione di rete aggiuntiva**, quindi, nella chiamata *connectionName* al metodo, fornisci il `connectionName` parametro.

In alcuni casi, è necessario configurare ulteriori prerequisiti:
+ Se utilizzi Streaming gestito da Amazon per Apache Kafka con l'autenticazione IAM, avrai bisogno di una configurazione appropriata di IAM.
+ Se utilizzi Streaming gestito da Amazon per Apache Kafka con un Amazon VPC, avrai bisogno di una configurazione appropriata di Amazon VPC. Dovrai creare una connessione AWS Glue che fornisca informazioni sulla connessione Amazon VPC. È necessaria la configurazione del lavoro per includere la connessione AWS Glue come **connessione di rete aggiuntiva**.

Per ulteriori informazioni sui prerequisiti dei processi ETL dei flussi di dati, consulta la pagina [Aggiunta di processi di streaming ETL in AWS Glue](add-job-streaming.md).

### Esempio: lettura di flussi da Kafka
<a name="glue-streaming-etl-connect-kafka-read"></a>

Usato in combinazione con [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Esempio per l'origine di streaming Kafka:

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "startingOffsets": "earliest", 
      "inferSchema": "true", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

### Esempio: scrittura in flussi Kafka
<a name="glue-streaming-etl-connect-kafka-write"></a>

Esempi di scrittura in Kafka:

Esempio con il metodo `getSink`:

```
data_frame_datasource0 = 
glueContext.getSink(
	connectionType="kafka",
	connectionOptions={
		JsonOptions("""{
			"connectionName": "ConfluentKafka", 
			"classification": "json", 
			"topic": "kafka-auth-topic", 
			"typeOfData": "kafka"}
		""")}, 
	transformationContext="dataframe_ApacheKafka_node1711729173428")
	.getDataFrame()
```

Esempio con il metodo `write_dynamic_frame.from_options`:

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

### Indicazioni di riferimento alle opzioni di connessione a Kafka
<a name="glue-streaming-etl-connect-kafka"></a>

Per la lettura, utilizzare le seguenti opzioni di connessione con `"connectionType": "kafka"`:
+ `"bootstrap.servers"`(Obbligatorio) Un elenco di server di bootstrap URLs, ad esempio, come`b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094`. Questa opzione deve essere specificata nella chiamata API o definita nei metadati della tabella in catalogo dati.
+ `"security.protocol"`: (obbligatorio) Il protocollo utilizzato per comunicare con i broker. I valori possibili sono `"SSL"` o `"PLAINTEXT"`.
+ `"topicName"`: (obbligatorio) un elenco separato da virgole di argomenti a cui iscriversi. Devi specificare solo uno tra `"topicName"`, `"assign"` o `"subscribePattern"`.
+ `"assign"`: (obbligatorio) una stringa JSON che specifica il `TopicPartitions` specifico da utilizzare. Devi specificare solo uno tra `"topicName"`, `"assign"` o `"subscribePattern"`.

  Esempio: '\$1"topicA":[0,1],"topicB":[2,4]\$1'
+ `"subscribePattern"`: (obbligatorio) una stringa regex Java che identifichi l'elenco degli argomenti a cui effettuare la sottoscrizione. Devi specificare solo uno tra `"topicName"`, `"assign"` o `"subscribePattern"`.

  Esempio: 'topic.\$1'
+ `"classification"` (obbligatorio): il formato di file utilizzato dai dati nel record. Obbligatorio, a meno che non sia fornito tramite Catalogo dati.
+ `"delimiter"` (opzionale): il separatore di valori utilizzato quando `classification` è CSV. Il valore predefinito è “`,`”.
+ `"startingOffsets"`: (opzionale) la posizione di partenza nell'argomento Kafka da cui leggere i dati. I valori possibili sono `"earliest"` o `"latest"`. Il valore predefinito è `"latest"`.
+ `"startingTimestamp"`: (Facoltativo, supportato solo per AWS Glue versione 4.0 o successiva) Il timestamp del record nell'argomento Kafka da cui leggere i dati. Il valore possibile è una stringa timestamp in formato UTC nel modello `yyyy-mm-ddTHH:MM:SSZ`, dove `Z` rappresenta un offset del fuso orario UTC con un segno \$1/- (ad esempio: "2023-04-04T08:00:00-04:00").

  Nota: nell'elenco delle opzioni di connessione dello script di streaming AWS Glue può essere presente solo uno tra 'startingOffsets' o 'startingTimestamp', l'inclusione di entrambe queste proprietà comporterà un errore del lavoro.
+ `"endingOffsets"`: (opzionale) il punto di fine di una query batch. I valori possibili sono `"latest"` o una stringa JSON che specifica un offset finale per ogni `TopicPartition`.

  Per la stringa JSON, il formato è `{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}`. Il valore `-1` come offset rappresenta `"latest"`.
+ `"pollTimeoutMs"`: (opzionale) il timeout in millisecondi per il polling dei dati da Kafka negli executor del processo Spark. Il valore predefinito è `600000`.
+ `"numRetries"`: (opzionale) i numero di tentativi prima di non riuscire a recuperare gli offset Kafka. Il valore predefinito è `3`.
+ `"retryIntervalMs"`: (opzionale) il tempo di attesa in millisecondi prima di riprovare a recuperare gli offset Kafka. Il valore predefinito è `10`.
+ `"maxOffsetsPerTrigger"`: (opzionale) il limite di velocità sul numero massimo di offset elaborati per intervallo di trigger. Il numero totale di offset specificato viene suddiviso proporzionalmente tra `topicPartitions` di diversi volumi. Il valore di default è null, il che significa che il consumer legge tutti gli offset fino all'ultimo offset noto.
+ `"minPartitions"`: (opzionale) il numero minimo desiderato di partizioni da leggere da Kafka. Il valore di default è null, il che significa che il numero di partizioni Spark è uguale al numero di partizioni Kafka.
+  `"includeHeaders"`: (opzionale) indica se includere le intestazioni Kafka. Quando l'opzione è impostata su "true", l'output dei dati conterrà una colonna aggiuntiva denominata "glue\$1streaming\$1kafka\$1headers" con tipo `Array[Struct(key: String, value: String)]`. Il valore di default è "false". Questa opzione è disponibile in AWS Glue versione 3.0 o successive. 
+ `"schema"`: (obbligatorio quando inferSchema è impostato su false) lo schema da utilizzare per elaborare il payload. Se la classificazione è `avro`, lo schema fornito dovrà essere nel formato dello schema Avro. Se la classificazione è `avro`, lo schema fornito dovrà essere nel formato dello schema DDL.

  Di seguito sono riportati alcuni esempi di schema.

------
#### [ Example in DDL schema format ]

  ```
  'column1' INT, 'column2' STRING , 'column3' FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
  "type":"array",
  "items":
  {
  "type":"record",
  "name":"test",
  "fields":
  [
    {
      "name":"_id",
      "type":"string"
    },
    {
      "name":"index",
      "type":
      [
        "int",
        "string",
        "float"
      ]
    }
  ]
  }
  }
  ```

------
+ `"inferSchema"`: (facoltativo) il valore di default è "false". Se impostato su "true", lo schema verrà rilevato in fase di runtime dal payload all'interno di `foreachbatch`.
+ `"avroSchema"`: (obsoleto) parametro utilizzato per specificare uno schema di dati Avro quando viene utilizzato il formato Avro. Questo parametro è obsoleto. Utilizzo del parametro `schema`.
+ `"addRecordTimestamp"`: (opzionale) Quando questa opzione è impostata su "true", l'output dei dati conterrà una colonna aggiuntiva denominata "\$1\$1src\$1timestamp" che indica l'ora in cui il record corrispondente è stato ricevuto dall'argomento. Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.
+ `"emitConsumerLagMetrics"`: (Facoltativo) Quando l'opzione è impostata su «true», per ogni batch, emette le metriche relative alla durata compresa tra il record più vecchio ricevuto dall'argomento e l'ora in cui arriva. AWS Glue CloudWatch Il nome della metrica è «glue.driver.streaming. maxConsumerLagInMs». Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.

Per la scrittura, utilizzare le seguenti opzioni di connessione con `"connectionType": "kafka"`:
+ `"connectionName"`(Obbligatorio) Nome della connessione AWS Glue utilizzata per connettersi al cluster Kafka (simile al codice sorgente Kafka).
+ `"topic"` (obbligatorio) Se esiste una colonna di argomento, il relativo valore viene utilizzato come argomento quando si scrive la riga specifica in Kafka, a meno che non sia impostata l'opzione di configurazione dell'argomento. In altre parole, l'opzione di configurazione `topic` sovrascrive la colonna dell'argomento.
+ `"partition"` (opzionale) Se viene specificato un numero di partizione valido, `partition` verrà utilizzato per l'invio del record.

  Se non viene specificata alcuna partizione ma è presente `key`, verrà scelta una partizione utilizzando un hash della chiave.

  Se non sono presenti né `key` né `partition`, verrà scelta una partizione in base al partizionamento permanente delle modifiche quando nella partizione vengono prodotti almeno batch.size byte.
+ `"key"` (opzionale) Utilizzato per il partizionamento se `partition` è null.
+ `"classification"` (opzionale) Il formato di file utilizzato dai dati nel record. Supportiamo solo JSON, CSV e Avro.

  Con il formato Avro, possiamo fornire un AvroSchema personalizzato con cui serializzare, ma bisogna considerare che tale schema deve essere fornito anche nell'origine per la deserializzazione. Altrimenti, per impostazione predefinita utilizza Apache per la serializzazione. AvroSchema 

Inoltre, è possibile eseguire il fine-tuning del sink Kafka in base alle esigenze aggiornando i [parametri di configurazione del produttore di Kafka](https://kafka.apache.org/documentation/#producerconfigs). Da notare che non esiste un elenco delle opzioni di connessione consentite, tutte le coppie chiave-valore vengono mantenute nel sink così come sono.

Tuttavia, esiste un piccolo elenco di opzioni di rifiuto che non avranno effetto. Per ulteriori informazioni, consulta [le configurazioni specifiche di Kafka](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).

## Utilizzo delle connessioni Kinesis
<a name="glue-streaming-kinesis-connections"></a>

È possibile usare una connessione Kinesis per leggere e scrivere su flussi di dati Amazon Kinesis utilizzando le informazioni archiviate in una tabella del Catalogo dati o fornendo informazioni per accedere direttamente al flusso di dati. Puoi leggere le informazioni da Kinesis in Spark DataFrame, quindi convertirle in un Glue. AWS DynamicFrame Puoi DynamicFrames scrivere su Kinesis in formato JSON. Se accedi direttamente al flusso di dati, utilizza queste opzioni per fornire le informazioni su come accedere al flusso di dati.

Se si utilizza `getCatalogSource` o `create_data_frame_from_catalog` per consumare i registri da una sorgente di streaming Kinesis, il processo avrà le informazioni sul database catalogo dati e sul nome della tabella, e potrà usarle per ottenere alcuni parametri di base per la lettura dalla sorgente di streaming Kinesis. Se si utilizza `getSource`, `getSourceWithFormat`, `createDataFrameFromOptions` o `create_data_frame_from_options`, dovrai specificare questi parametri di base utilizzando le opzioni di connessione descritte qui.

È possibile specificare le opzioni di connessione per Kinesis utilizzando i seguenti argomenti per i metodi specificati nella classe `GlueContext`.
+ Scala
  + `connectionOptions`: utilizza con `getSource`, `createDataFrameFromOptions` e `getSink` 
  + `additionalOptions`: utilizza con `getCatalogSource`, `getCatalogSink`
  + `options`: utilizza con `getSourceWithFormat`, `getSinkWithFormat`
+ Python
  + `connection_options`: utilizza con `create_data_frame_from_options`, `write_dynamic_frame_from_options`
  + `additional_options`: utilizza con `create_data_frame_from_catalog`, `write_dynamic_frame_from_catalog`
  + `options`: utilizza con `getSource`, `getSink`

Per osservazioni e restrizioni sui processi ETL dei flussi di dati, consultare la pagina [Streaming di note e restrizioni ETL](add-job-streaming.md#create-job-streaming-restrictions).

### Configurazione di Kinesis
<a name="glue-streaming-etl-connect-kinesis-configure"></a>

Per connetterti a un flusso di dati Kinesis in un job AWS Glue Spark, avrai bisogno di alcuni prerequisiti:
+ In caso di lettura, il job AWS Glue deve disporre delle autorizzazioni IAM di livello di accesso Read per il flusso di dati Kinesis.
+ In fase di scrittura, il job AWS Glue deve disporre delle autorizzazioni IAM di livello di accesso Write per il flusso di dati Kinesis.

In alcuni casi, è necessario configurare ulteriori prerequisiti:
+ Se il tuo job AWS Glue è configurato con **connessioni di rete aggiuntive** (in genere per connettersi ad altri set di dati) e una di queste connessioni offre opzioni di **rete Amazon VPC**, questo indirizzerà il tuo lavoro a comunicare tramite Amazon VPC. In questo caso, per comunicare tramite Amazon VPC dovrai configurare anche il flusso di dati Kinesis. È possibile farlo creando un endpoint VPC di interfaccia tra l'Amazon VPC e il flusso di dati Kinesis. Per ulteriori informazioni, consulta la pagina [Using Amazon Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com//streams/latest/dev/vpc.html).
+ Quando si specifica un flusso di dati Amazon Kinesis in un altro account, è necessario impostare i ruoli e le policy per consentire l'accesso multi-account. Per ulteriori informazioni, consultare [ Esempio: lettura da un flusso Kinesis in un account diverso](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html).

Per ulteriori informazioni sui prerequisiti dei processi ETL dei flussi di dati, consultare la pagina [Aggiunta di processi di streaming ETL in AWS Glue](add-job-streaming.md).

### Lettura da Kinesis
<a name="glue-streaming-etl-connect-kinesis-read"></a>

#### Esempio: lettura da flussi Kinesis
<a name="section-etl-connect-kinesis-read"></a>

Usato in combinazione con [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Esempio per l'origine di streaming Amazon Kinesis:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

### Scrittura su Kinesis
<a name="glue-streaming-etl-connect-kinesis-write"></a>

#### Esempio: scrittura su flussi Kinesis
<a name="section-etl-connect-kinesis-write"></a>

Usato in combinazione con [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch). Il tuo DynamicFrame verrà scritto nello stream in formato JSON. Se il processo non riesce a scrivere dopo diversi tentativi, riporterà un errore. Per impostazione predefinita, ogni DynamicFrame record viene inviato allo stream Kinesis singolarmente. È possibile configurare questo comportamento utilizzando `aggregationEnabled` e i parametri associati. 

Esempio di scrittura su Amazon Kinesis da un processo di streaming:

------
#### [ Python ]

```
glueContext.write_dynamic_frame.from_options(
    frame=frameToWrite
    connection_type="kinesis",
    connection_options={
        "partitionKey": "part1",
        "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName",
    }
)
```

------
#### [ Scala ]

```
glueContext.getSinkWithFormat(
                connectionType="kinesis",
                options=JsonOptions("""{
                    "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName",
                    "partitionKey": "part1"
                }"""), 
           )
           .writeDynamicFrame(frameToWrite)
```

------

### Parametri di connessione Kinesis
<a name="glue-streaming-etl-connect-kinesis-connect"></a>

Indica le opzioni di connessione ad Amazon Kinesis Data Streams.

Utilizza le seguenti opzioni di connessione per le origini dati in streaming Kinesis: 
+ `"streamARN"`: (obbligatorio) utilizzato per la lettura/scrittura. L'ARN del flusso di dati di Kinesis.
+ `"classification"`: (obbligatorio per la lettura) utilizzato per la lettura. Il formato di file utilizzato dai dati nel record. Obbligatorio, a meno che non sia fornito tramite Catalogo dati.
+ `"streamName"`: (facoltativo) utilizzato per la lettura. Il nome di un flusso di dati Kinesis da cui leggere. Usato con `endpointUrl`.
+ `"endpointUrl"`: (facoltativo) utilizzato per la lettura. Predefinito: "https://kinesis.us-east-1.amazonaws.com». L' AWS endpoint del flusso Kinesis. Non è necessario modificarlo a meno che non ci si stia connettendo a una regione speciale.
+ `"partitionKey"`: (facoltativo) utilizzato per la scrittura. La chiave di partizione di Kinesis utilizzata per la produzione dei record.
+ `"delimiter"`: (facoltativo) utilizzato per la lettura. Il separatore di valori utilizzato quando `classification` è CSV. Il valore predefinito è “`,`”.
+ `"startingPosition"`: (opzionale) utilizzato per la lettura. La posizione di partenza nel flusso dei dati Kinesis da cui leggere i dati. I valori possibili sono `"latest"`, `"trim_horizon"`, `"earliest"` o una stringa di timestamp in formato UTC con il modello `yyyy-mm-ddTHH:MM:SSZ`, dove `Z` rappresenta uno scostamento del fuso orario UTC con un \$1/- (ad esempio: "2023-04-04T08:00:00-04:00"). Il valore predefinito è `"latest"`. Nota: la stringa Timestamp in formato UTC per `"startingPosition"` è supportata solo per AWS Glue versione 4.0 o successiva.
+ `"failOnDataLoss"`: (facoltativo) non è possibile eseguire il processo se una partizione attiva è mancante o scaduta. Il valore predefinito è `"false"`.
+ `"awsSTSRoleARN"`: (facoltativo) utilizzato per la lettura/scrittura. L'Amazon Resource Name (ARN) del ruolo da assumere utilizzando AWS Security Token Service (AWS STS). Questo ruolo deve disporre delle autorizzazioni per descrivere o leggere le operazioni dei registri per il flusso di dati Kinesis. Quando si accede a un flusso di dati in un altro account, è necessario utilizzare questo parametro. Usato in combinazione con `"awsSTSSessionName"`.
+ `"awsSTSSessionName"`: (facoltativo) utilizzato per la lettura/scrittura. Un identificatore della sessione che assume il ruolo usando AWS STS. Quando si accede a un flusso di dati in un altro account, è necessario utilizzare questo parametro. Usato in combinazione con `"awsSTSRoleARN"`.
+ `"awsSTSEndpoint"`: (Facoltativo) L' AWS STS endpoint da utilizzare quando ci si connette a Kinesis con un ruolo presunto. Ciò consente di utilizzare l' AWS STS endpoint regionale in un VPC, cosa non possibile con l'endpoint globale predefinito.
+ `"maxFetchTimeInMs"`: (opzionale) utilizzato per la lettura. Il tempo massimo impiegato affinché l'esecutore del processo legga i record della batch attuale dal flusso di dati Kinesis, specificato in millisecondi (ms). Entro questo periodo è possibile effettuate più chiamate API `GetRecords`. Il valore predefinito è `1000`.
+ `"maxFetchRecordsPerShard"`: (opzionale) utilizzato per la lettura. Il numero massimo di record da recuperare per shard nel flusso di dati Kinesis per microbatch. Nota: il client può superare questo limite se il processo di streaming ha già letto i record aggiuntivi da Kinesis (nella stessa chiamata get-records). Se `maxFetchRecordsPerShard` deve essere rigoroso, allora deve essere un multiplo di `maxRecordPerRead`. Il valore predefinito è `100000`.
+ `"maxRecordPerRead"`: (opzionale) utilizzato per la lettura. Il numero massimo di record da recuperare nel flusso di dati Kinesis in ciascuna operazione `getRecords`. Il valore predefinito è `10000`.
+ `"addIdleTimeBetweenReads"`: (opzionale) utilizzato per la lettura. Aggiunge un ritardo tra due operazioni consecutive `getRecords`. Il valore predefinito è `"False"`. Questa opzione è configurabile solo per Glue versione 2.0 e successive. 
+ `"idleTimeBetweenReadsInMs"`: (opzionale) utilizzato per la lettura. Il ritardo minimo tra due operazioni consecutive `getRecords`, specificato in ms. Il valore predefinito è `1000`. Questa opzione è configurabile solo per Glue versione 2.0 e successive. 
+ `"describeShardInterval"`: (opzionale) utilizzato per la lettura. L'intervallo di tempo minimo tra due chiamate API `ListShards` affinché lo script consideri il resharding. Per ulteriori informazioni, consultare [Strategie per il resharding](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*. Il valore predefinito è `1s`.
+ `"numRetries"`: (opzionale) utilizzato per la lettura. Il numero massimo di tentativi per le richieste API Kinesis Data Streams. Il valore predefinito è `3`.
+ `"retryIntervalMs"`: (opzionale) utilizzato per la lettura. Il periodo di raffreddamento (specificato in ms) prima di riprovare la chiamata API Kinesis Data Streams. Il valore predefinito è `1000`.
+ `"maxRetryIntervalMs"`: (opzionale) utilizzato per la lettura. Il periodo di raffreddamento (specificato in ms) tra due tentativi di chiamata API Kinesis Data Streams. Il valore predefinito è `10000`.
+ `"avoidEmptyBatches"`: (opzionale) utilizzato per la lettura. Impedisce la creazione di un processo microbatch vuoto controllando la presenza di dati non letti nel flusso dei dati Kinesis prima che il batch venga avviato. Il valore predefinito è `"False"`.
+ `"schema"`: (obbligatorio quando inferSchema è impostato su falso) utilizzato per la lettura. Lo schema da utilizzare per elaborare il payload. Se la classificazione è `avro`, lo schema fornito dovrà essere nel formato dello schema Avro. Se la classificazione è `avro`, lo schema fornito dovrà essere nel formato dello schema DDL.

  Di seguito sono riportati alcuni esempi di schema.

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`: (opzionale) utilizzato per la lettura. Il valore predefinito è "false". Se impostato su "true", lo schema verrà rilevato in fase di runtime dal payload all'interno di `foreachbatch`.
+ `"avroSchema"`: (obsoleto) utilizzato per la lettura. Parametro utilizzato per specificare uno schema di dati Avro quando viene utilizzato il formato Avro. Questo parametro è obsoleto. Utilizzo del parametro `schema`.
+ `"addRecordTimestamp"`: (opzionale) utilizzato per la lettura. Quando questa opzione è impostata su "true", l'output dei dati conterrà una colonna aggiuntiva denominata "\$1\$1src\$1timestamp" che indica l'ora in cui il record corrispondente è stato ricevuto dal flusso. Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.
+ `"emitConsumerLagMetrics"`: (opzionale) utilizzato per la lettura. Quando l'opzione è impostata su «true», per ogni batch emette le metriche relative alla durata compresa tra il record più vecchio ricevuto dallo stream e il momento in cui arriva a. AWS Glue CloudWatch Il nome della metrica è «glue.driver.streaming. maxConsumerLagInMs». Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.
+ `"fanoutConsumerARN"`: (opzionale) utilizzato per la lettura. L'ARN di un consumatore di un flusso Kinesis per il flusso specificato in `streamARN`. Utilizzato per abilitare la modalità di fan-out avanzato per la connessione Kinesis. Per ulteriori informazioni sull'utilizzo di un flusso Kinesis con fan-out avanzato, consultare la pagina [Utilizzo del fan-out avanzato nei processi di flussi di dati Kinesis](aws-glue-programming-etl-connect-kinesis-efo.md).
+ `"recordMaxBufferedTime"`: (opzionale) utilizzato per la scrittura. Predefinito: 1000 (ms). Tempo massimo di memorizzazione nel buffer di un record in attesa di essere scritto.
+ `"aggregationEnabled"`: (opzionale) utilizzato per la scrittura. Default: true. Speciifica se i record devono essere aggregati prima di inviarli a Kinesis.
+ `"aggregationMaxSize"`: (opzionale) utilizzato per la scrittura. Impostazione predefinita: 51200 (byte). Se un record è superiore a questo limite, ignorerà l'aggregatore. Ricorda che Kinesis impone un limite di 50 KB alla dimensione del record. Se imposti questo valore oltre i 50 KB, i record di grandi dimensioni verranno rifiutati da Kinesis.
+ `"aggregationMaxCount"`: (facoltativo) utilizzato per la scrittura. Predefinito: 4294967295. Numero massimo di voci da inserire in un record aggregato.
+ `"producerRateLimit"`: (facoltativo) utilizzato per la scrittura. Predefinito: 150 (%). Limita la velocità di trasmissione effettiva per partizione inviata da un singolo produttore (ad esempio, il tuo processo), come percentuale del limite di backend.
+ `"collectionMaxCount"`: (facoltativo) utilizzato per la scrittura. Predefinito: 500. Numero massimo di articoli da imballare in una PutRecords richiesta. 
+ `"collectionMaxSize"`: (facoltativo) utilizzato per la scrittura. Impostazione predefinita: 5242880 (byte). Quantità massima di dati da inviare con una PutRecords richiesta.