

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Migration du connecteur Spark Kinesis vers le SDK 2.x pour Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

Le AWS SDK fournit un ensemble complet de bibliothèques permettant d'interagir avec APIs les services de AWS cloud computing, tels que la gestion des informations d'identification, la connexion aux services S3 et Kinesis. Le connecteur Spark Kinesis est utilisé pour consommer les données des flux de données Kinesis, et les données reçues sont transformées et traitées dans le moteur d’exécution de Spark. Actuellement, ce connecteur est basé sur les versions 1.x du AWS SDK et Kinesis-client-library (KCL). 

Dans le cadre de la migration vers le AWS SDK 2.x, le connecteur Spark Kinesis est également mis à jour en conséquence pour fonctionner avec le SDK 2.x. Dans la version 7.0 d’Amazon EMR, Spark contient la mise à niveau vers le SDK 2.x, qui n’est pas encore disponible dans la version communautaire d’Apache Spark. Si vous utilisez le connecteur Spark Kinesis sur une version antérieure à la version 7.0, migrez les codes de vos applications pour qu’ils s’exécutent sur le SDK 2.x avant de migrer vers Amazon EMR 7.0.

## Guides de migration
<a name="migrating-spark-kinesis-migration-guides"></a>

Cette section décrit les étapes de migration d’une application vers la version mise à niveau du connecteur Spark Kinesis. Il inclut des guides de migration vers la bibliothèque client Kinesis (KCL) 2.x, des fournisseurs AWS d'informations d'identification et des clients de AWS service dans le SDK 2.x. AWS À titre de référence, il inclut également un exemple de [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)programme utilisant le connecteur Kinesis.

**Topics**
+ [Migration de KCL de la version 1.x vers la version 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Migration des fournisseurs AWS d'informations d'identification du AWS SDK 1.x vers le SDK 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migration des clients AWS de service du AWS SDK 1.x vers le SDK 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Exemples de code pour les applications de streaming](#migrating-spark-kinesis-streaming-examples)
+ [Considérations relatives à l’utilisation de la version mise à niveau du connecteur Spark Kinesis](#migrating-spark-kinesis-considerations)

### Migration de KCL de la version 1.x vers la version 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Niveau et dimensions des métriques dans `KinesisInputDStream`**

  Lorsque vous instanciez un `KinesisInputDStream`, vous pouvez contrôler le niveau et les dimensions des métriques du flux. L’exemple suivant montre comment personnaliser ces paramètres avec KCL 1.x :

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  Dans KCL 2.x, ces paramètres de configuration ont des noms de package différents. Pour migrer vers la version 2.x :

  1. Remplacez les instructions d’importation pour `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` et `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` par `software.amazon.kinesis.metrics.MetricsLevel` et `software.amazon.kinesis.metrics.MetricsUtil` respectivement.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Remplacez la ligne `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` par `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`.

  Vous trouverez ci-dessous une version mise à jour du `KinesisInputDStream` avec un niveau de métrique et des dimensions de métriques personnalisés :

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Fonction de gestionnaire de messages dans `KinesisInputDStream`

  Lors de l’instanciation d’un `KinesisInputDStream`, vous pouvez également spécifier une « fonction de gestionnaire de messages » qui prend un enregistrement Kinesis et renvoie un objet générique T, au cas où vous souhaiteriez utiliser d’autres données incluses dans un enregistrement, comme une clé de partition.

  Dans KCL 1.x, la signature de la fonction de gestionnaire de messages est `Record => T`, où Record correspond à `com.amazonaws.services.kinesis.model.Record`. Dans KCL 2.x, la signature du gestionnaire est remplacée par :`KinesisClientRecord => T`, where is. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  Voici un exemple de spécification de gestionnaire de messages dans KCL 1.x :

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Pour migrer le gestionnaire de messages :

  1. Remplacez l’instruction d’importation `com.amazonaws.services.kinesis.model.Record` par `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Mettez à jour la signature de la méthode du gestionnaire de messages.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Voici un exemple mis à jour de spécification de gestionnaire de messages dans KCL 2.x :

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Pour plus d’informations sur la migration de KCL 1.x vers KCL 2.x, voir la rubrique [Migration des consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html).

### Migration des fournisseurs AWS d'informations d'identification du AWS SDK 1.x vers le SDK 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Les fournisseurs d'informations d'identification sont utilisés pour obtenir des AWS informations d'identification pour les interactions avec AWS. Il existe plusieurs modifications d’interface et de classe liées aux fournisseurs d’informations d’identification dans le SDK 2.x. Ces modifications peuvent être consultées [ici](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). Le connecteur Spark Kinesis a défini une interface (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) et des classes d'implémentation qui renvoient la version 1.x des fournisseurs d'informations d' AWS identification. Ces fournisseurs d’informations d’identification sont nécessaires lors de l’initialisation des clients Kinesis. Par exemple, si vous utilisez cette méthode `SparkAWSCredentials.provider` dans les applications, vous devrez mettre à jour les codes pour utiliser la version 2.x des fournisseurs AWS d'informations d'identification.

Voici un exemple d'utilisation des fournisseurs d'informations d'identification dans le AWS SDK 1.x :

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Pour migrer vers la version 2.x du SDK :**

1. Remplacez l’instruction d’importation `com.amazonaws.auth.AWSCredentialsProvider` par `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Mettez à jour les autres codes qui utilisent cette classe. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migration des clients AWS de service du AWS SDK 1.x vers le SDK 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS les clients de service ont des noms de package différents dans la version 2.x (c'est-à-dire`software.amazon.awssdk`), alors que le SDK 1.x les utilise. `com.amazonaws` Pour plus d’informations sur les modifications apportées aux clients, voir [cette page](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Si vous utilisez ces clients de service dans les codes, vous devez migrer les clients en conséquence.

Voici un exemple de création d’un client dans la version 1.x du SDK :

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Pour migrer vers la version 2.x :**

1. Modifiez les instructions d’importation pour les clients de service. Prenons l’exemple des clients DynamoDB. Remplacez `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` ou `com.amazonaws.services.dynamodbv2.document.DynamoDB` par `software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Mettez à jour les codes qui initialisent les clients.

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Pour plus d'informations sur la migration du AWS SDK de la version 1.x vers la version 2.x, voir [Quelles sont les différences entre le SDK pour AWS Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x et 2.x ?

### Exemples de code pour les applications de streaming
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Considérations relatives à l’utilisation de la version mise à niveau du connecteur Spark Kinesis
<a name="migrating-spark-kinesis-considerations"></a>
+ Si vos applications utilisent la `Kinesis-producer-library` avec une version du JDK antérieure à la version 11, des exceptions peuvent se produire, comme `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`. Cela est dû au fait que EMR 7.0 est fourni avec le JDK 17 par défaut et que les modules J2EE ont été supprimés des bibliothèques standard depuis Java 11. Il est possible de résoudre le problème en ajoutant la dépendance suivante dans le fichier pom. Remplacez la version de la bibliothèque par une autre selon vos besoins.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Le fichier jar du connecteur Spark Kinesis se trouve à l’emplacement suivant après la création d’un cluster EMR : `/usr/lib/spark/connector/lib/`.