

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Migrazione del connettore Spark Kinesis all'SDK 2.x per Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

L' AWS SDK offre un ricco set di librerie per interagire con APIs i servizi di AWS cloud computing, come la gestione delle credenziali, la connessione ai servizi S3 e Kinesis. Il connettore Spark Kinesis viene utilizzato per consumare dati dal flusso di dati Kinesis e i dati ricevuti vengono trasformati ed elaborati nel motore di esecuzione di Spark. Attualmente questo connettore è basato su 1.x di AWS SDK e (KCL). Kinesis-client-library 

Come parte della migrazione all' AWS SDK 2.x, anche il connettore Spark Kinesis viene aggiornato di conseguenza per funzionare con l'SDK 2.x. Nella versione 7.0 di Amazon EMR, Spark contiene l'aggiornamento SDK 2.x che non è ancora disponibile nella versione community di Apache Spark. Se utilizzi il connettore Spark Kinesis da una versione precedente alla 7.0, devi effettuare la migrazione dei codici dell'applicazione per eseguirli su SDK 2.x prima di poterla effettuare su Amazon EMR 7.0.

## Guide alla migrazione
<a name="migrating-spark-kinesis-migration-guides"></a>

Questa sezione descrive i passaggi per eseguire la migrazione di un'applicazione al connettore Spark Kinesis aggiornato. Include guide per la migrazione alla Kinesis Client Library (KCL) 2.x AWS , fornitori di credenziali AWS e client di servizi in SDK 2.x. AWS A titolo di riferimento, include anche un [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)programma di esempio che utilizza il connettore Kinesis.

**Topics**
+ [

### Migrazione di KLC da 1.x a 2.x
](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [

### Migrazione dei fornitori di AWS credenziali da SDK 1.x a 2.x AWS
](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [

### Migrazione dei client di AWS servizio da AWS SDK 1.x a 2.x
](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [

### Esempi di codice per applicazioni di streaming
](#migrating-spark-kinesis-streaming-examples)
+ [

### Considerazioni sull'utilizzo del connettore Spark Kinesis aggiornato
](#migrating-spark-kinesis-considerations)

### Migrazione di KLC da 1.x a 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Parametri, livello e dimensioni in `KinesisInputDStream`**

  Quando crei un'istanza `KinesisInputDStream`, puoi controllare il livello e le dimensioni dei parametri per il flusso. L'esempio seguente mostra come personalizzare questi parametri con KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  In KCL 2.x, queste impostazioni di configurazione hanno nomi di pacchetto diversi. Per eseguire la migrazione a 2.x:

  1. Modifica le istruzioni di importazione per `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` e `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` rispettivamente in `software.amazon.kinesis.metrics.MetricsLevel` e `software.amazon.kinesis.metrics.MetricsUtil`.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Sostituisci la riga `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` con `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  Di seguito è riportata una versione aggiornata di `KinesisInputDStream` con livello e dimensioni dei parametri personalizzati:

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Funzione del gestore di messaggi in `KinesisInputDStream`

  Quando crei un'istanza `KinesisInputDStream`, puoi anche fornire una "funzione del gestore di messaggi" che accetta un Kinesis Record e restituisce un oggetto generico T, nel caso in cui desideri utilizzare altri dati inclusi in un Record come la chiave di partizione.

  In KCL 1.x, la firma della funzione del gestore di messaggi è `Record => T`, dove Record è `com.amazonaws.services.kinesis.model.Record`. In KCL 2.x, la firma del gestore viene modificata in:`KinesisClientRecord => T`, where is. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  Di seguito è riportato un esempio di fornitura di un gestore di messaggi in KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Per eseguire la migrazione del gestore di messaggi:

  1. Modifica l'istruzione di importazione per `com.amazonaws.services.kinesis.model.Record` in `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Aggiorna la firma del metodo del gestore di messaggi.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Di seguito è riportato un esempio aggiornato di fornitura di un gestore di messaggi in KCL 2.x:

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Per ulteriori informazioni sulla migrazione da KCL 1.x a 2.x, consulta [Migrazione dei consumatori da KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html).

### Migrazione dei fornitori di AWS credenziali da SDK 1.x a 2.x AWS
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

I fornitori di credenziali vengono utilizzati per ottenere credenziali per le interazioni con. AWS AWS Esistono diverse modifiche all'interfaccia e alla classe relative ai fornitori di credenziali in SDK 2.x, che sono disponibili [qui](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). Il connettore Spark Kinesis ha definito un'interfaccia `org.apache.spark.streaming.kinesis.SparkAWSCredentials` () e classi di implementazione che restituiscono la versione AWS 1.x dei provider di credenziali. Questi fornitori di credenziali sono necessari per inizializzare i client Kinesis. Ad esempio, se utilizzi il metodo `SparkAWSCredentials.provider` nelle applicazioni, dovrai aggiornare i codici per utilizzare la versione 2.x dei provider di credenziali. AWS 

Di seguito è riportato un esempio di utilizzo dei provider di credenziali in AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Per eseguire la migrazione a SDK 2.x:**

1. Modifica l'istruzione di importazione per `com.amazonaws.auth.AWSCredentialsProvider` in `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Aggiorna i codici rimanenti che utilizzano questa classe. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migrazione dei client di AWS servizio da AWS SDK 1.x a 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS i client di servizio hanno nomi di pacchetto diversi in 2.x (cioè`software.amazon.awssdk`), mentre l'SDK 1.x lo utilizza. `com.amazonaws` Per ulteriori informazioni sulle modifiche del client, consulta [questa pagina](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Se utilizzi questi client del servizio nei codici, devi eseguire la migrazione dei client di conseguenza.

Di seguito è riportato un esempio di creazione di un client in SDK 1.x:

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Per eseguire la migrazione a 2.x:**

1. Modifica le istruzioni di importazione per i client del servizio. Prendiamo ad esempio i client DynamoDB. Devi cambiare `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` o `com.amazonaws.services.dynamodbv2.document.DynamoDB` in `software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Aggiorna i codici che inizializzano i client

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Per ulteriori informazioni sulla migrazione dell' AWS SDK da 1.x a 2.x, consulta [Cosa c'è di diverso tra SDK for AWS Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x e 2.x

### Esempi di codice per applicazioni di streaming
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Considerazioni sull'utilizzo del connettore Spark Kinesis aggiornato
<a name="migrating-spark-kinesis-considerations"></a>
+ Se le tue applicazioni utilizzano la versione `Kinesis-producer-library` con la versione di JDK precedente alla 11, potresti imbatterti in eccezioni come `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`. Ciò accade perché EMR 7.0 viene fornito con JDK 17 per impostazione predefinita e i moduli J2EE sono stati rimossi dalle librerie standard a partire da Java 11\$1. Questo problema può essere risolto aggiungendo la seguente dipendenza nel file pom. Sostituisci la versione della libreria con una versione che pensi possa essere più adeguata.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Il jar del connettore Spark Kinesis si trova in questo percorso dopo la creazione di un cluster EMR: `/usr/lib/spark/connector/lib/`