

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Kinesis
<a name="emr-kinesis"></a>

Les clusters Amazon EMR peuvent lire et traiter directement les flux Amazon Kinesis à l'aide d'outils courants de l'écosystème Hadoop tels que Hive, Pig MapReduce, l'API de streaming Hadoop et Cascading. Vous pouvez également joindre les données en temps réel d'Amazon Kinesis aux données existantes sur Amazon S3, Amazon DynamoDB et HDFS dans un cluster en cours d'exécution. Vous pouvez charger directement les données d'Amazon EMR sur Amazon S3 ou DynamoDB pour les activités de post-traitement. Pour plus d’informations sur les caractéristiques principales du service Amazon Kinesis et ses tarifs, voir la page [Amazon Kinesis](https://aws.amazon.com//kinesis).

## Que puis-je faire avec l'intégration d'Amazon EMR et Amazon Kinesis ?
<a name="kinesis-use-cases"></a>

 L'intégration entre Amazon EMR et Amazon Kinesis facilite considérablement certains scénarios, par exemple : 
+ **Analyse de journaux de streaming** – Vous pouvez analyser les journaux web de vos flux de diffusion continus afin de générer la liste des 10 principaux types d'erreurs par région, par navigateur et par domaine d'accès à intervalles de quelques minutes. 
+ **Engagement client** – Vous pouvez écrire des requêtes associant des données de parcours provenant d'Amazon Kinesis à des informations sur une campagne publicitaire stockées dans une table DynamoDB, dans le but d'identifier les catégories de publicité les plus efficaces parmi celles affichées sur des sites web donnés. 
+ **Requêtes interactives ad-hoc** – Vous pouvez charger périodiquement des données provenant de flux Amazon Kinesis dans HDFS et les mettre à disposition sous forme d'une table Impala locale permettant des requêtes analytiques, rapides et interactives.

## Analyse de points de contrôle des flux Amazon Kinesis
<a name="kinesis-checkpoint"></a>

Les utilisateurs peuvent exécuter des analyses régulières par lots de flux Amazon Kinesis dans ce que l'on appelle des *itérations*. Étant donné que les enregistrements de données de flux Amazon Kinesis sont récupérés à l'aide d'un numéro de séquence, des limites d'itérations sont définies par des numéros de début et de fin de séquences qu'Amazon EMR stocke dans une table DynamoDB. Par exemple, quand `iteration0` se termine, elle stocke le numéro de fin de séquence dans DynamoDB afin que lorsque la tâche `iteration1` commence, elle puisse extraire les données suivantes du flux. Ce mappage d'itérations de données de flux s'appelle un *point de contrôle*. Pour plus d'informations, consultez [Connecteur Kinesis](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector).

Si une itération a été vérifiée et que la tâche a échoué lors du traitement d'une itération, Amazon EMR tente de retraiter les enregistrements de cette itération. 

Les points de contrôle sont une fonctionnalité qui vous permet de : 
+ Commencer le traitement de données après un numéro de séquence traité par une requête précédente qui s'est exécutée sur le même flux de données et nom logique
+ Retraiter le même lot de données à partir de Kinesis qui a été traité par une requête précédente

 Pour activer les points de contrôle, définissez le paramètre `kinesis.checkpoint.enabled` sur `true` dans vos scripts. En outre, configurez les paramètres suivants :


| Paramètre de configuration | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | Nom de la table DynamoDB où seront stockées les informations de point de contrôle | 
| kinesis.checkpoint.metastore.hash.key.name | Nom de la clé de hachage pour la table DynamoDB | 
| kinesis.checkpoint.metastore.hash.range.name | Nom de la clé de plage pour la table DynamoDB | 
| kinesis.checkpoint.logical.name | Un nom logique pour le traitement actuel | 
| kinesis.checkpoint.iteration.no | Numéro de l'itération pour le traitement associé au nom logique | 
| kinesis.rerun.iteration.without.wait | Valeur booléenne qui indique si une itération ayant échoué peut être exécutée à nouveau sans attente l'expiration ; la valeur par défaut est false | 

### Recommandations d'IOPS provisionnés pour les tables Amazon DynamoDB
<a name="kinesis-checkpoint-DDB"></a>

Le connecteur Amazon EMR pour Amazon Kinesis utilise la base de données DynamoDB comme support pour les métadonnées de pointage. Vous devez créer une table dans DynamoDB avant de consommer des données dans un flux Amazon Kinesis avec un cluster Amazon EMR à intervalles contrôlés. La table doit se trouver dans la même région que votre cluster Amazon EMR. Voici des recommandations générales pour le nombre d'IOPS que vous devez mettre en service pour vos tables DynamoDB. `j` est le nombre maximum de tâches Hadoop (avec une combinaison de nombre d'itérations\$1nom logique différente) pouvant s'exécuter simultanément et `s` est le nombre maximum de partitions que toute tâche traitera :

Pour **Read Capacity Units (Unités de capacité en lecture)** : `j`\$1`s`/`5`

Pour **Write Capacity Units (Unités de capacité en écriture)** : `j`\$1`s`

## Considérations sur les performances
<a name="performance"></a>

Le débit de la partition Amazon Kinesis est directement proportionnel à la taille de l'instance des nœuds dans les clusters Amazon EMR et à la taille de l'enregistrement dans le flux. Nous vous recommandons d'utiliser des instances m5.xlarge ou plus importantes sur les nœuds maîtres et principaux.

## Planifiez une analyse Amazon Kinesis avec Amazon EMR
<a name="schedule"></a>

Lorsque vous analysez des données sur un flux Amazon Kinesis actif, limité par des délais d'attente et une durée maximale pour l'itération, il est important que vous exécutiez l'analyse fréquemment pour collecter régulièrement des détails à partir du flux. Il existe plusieurs façons d'exécuter ces scripts et requêtes à intervalles réguliers ; nous vous conseillons d'utiliser AWS Data Pipeline pour les tâches répétitives de ce type. Pour plus d'informations, consultez [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)et consultez [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)le *Guide du AWS Data Pipeline développeur*.

# Migration du connecteur Spark Kinesis vers le SDK 2.x pour Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

Le AWS SDK fournit un ensemble complet de bibliothèques permettant d'interagir avec APIs les services de AWS cloud computing, tels que la gestion des informations d'identification, la connexion aux services S3 et Kinesis. Le connecteur Spark Kinesis est utilisé pour consommer les données des flux de données Kinesis, et les données reçues sont transformées et traitées dans le moteur d’exécution de Spark. Actuellement, ce connecteur est basé sur les versions 1.x du AWS SDK et Kinesis-client-library (KCL). 

Dans le cadre de la migration vers le AWS SDK 2.x, le connecteur Spark Kinesis est également mis à jour en conséquence pour fonctionner avec le SDK 2.x. Dans la version 7.0 d’Amazon EMR, Spark contient la mise à niveau vers le SDK 2.x, qui n’est pas encore disponible dans la version communautaire d’Apache Spark. Si vous utilisez le connecteur Spark Kinesis sur une version antérieure à la version 7.0, migrez les codes de vos applications pour qu’ils s’exécutent sur le SDK 2.x avant de migrer vers Amazon EMR 7.0.

## Guides de migration
<a name="migrating-spark-kinesis-migration-guides"></a>

Cette section décrit les étapes de migration d’une application vers la version mise à niveau du connecteur Spark Kinesis. Il inclut des guides de migration vers la bibliothèque client Kinesis (KCL) 2.x, des fournisseurs AWS d'informations d'identification et des clients de AWS service dans le SDK 2.x. AWS À titre de référence, il inclut également un exemple de [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)programme utilisant le connecteur Kinesis.

**Topics**
+ [Migration de KCL de la version 1.x vers la version 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Migration des fournisseurs AWS d'informations d'identification du AWS SDK 1.x vers le SDK 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migration des clients AWS de service du AWS SDK 1.x vers le SDK 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Exemples de code pour les applications de streaming](#migrating-spark-kinesis-streaming-examples)
+ [Considérations relatives à l’utilisation de la version mise à niveau du connecteur Spark Kinesis](#migrating-spark-kinesis-considerations)

### Migration de KCL de la version 1.x vers la version 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Niveau et dimensions des métriques dans `KinesisInputDStream`**

  Lorsque vous instanciez un `KinesisInputDStream`, vous pouvez contrôler le niveau et les dimensions des métriques du flux. L’exemple suivant montre comment personnaliser ces paramètres avec KCL 1.x :

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  Dans KCL 2.x, ces paramètres de configuration ont des noms de package différents. Pour migrer vers la version 2.x :

  1. Remplacez les instructions d’importation pour `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` et `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` par `software.amazon.kinesis.metrics.MetricsLevel` et `software.amazon.kinesis.metrics.MetricsUtil` respectivement.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Remplacez la ligne `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` par `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`.

  Vous trouverez ci-dessous une version mise à jour du `KinesisInputDStream` avec un niveau de métrique et des dimensions de métriques personnalisés :

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Fonction de gestionnaire de messages dans `KinesisInputDStream`

  Lors de l’instanciation d’un `KinesisInputDStream`, vous pouvez également spécifier une « fonction de gestionnaire de messages » qui prend un enregistrement Kinesis et renvoie un objet générique T, au cas où vous souhaiteriez utiliser d’autres données incluses dans un enregistrement, comme une clé de partition.

  Dans KCL 1.x, la signature de la fonction de gestionnaire de messages est `Record => T`, où Record correspond à `com.amazonaws.services.kinesis.model.Record`. Dans KCL 2.x, la signature du gestionnaire est remplacée par :`KinesisClientRecord => T`, where is. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  Voici un exemple de spécification de gestionnaire de messages dans KCL 1.x :

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Pour migrer le gestionnaire de messages :

  1. Remplacez l’instruction d’importation `com.amazonaws.services.kinesis.model.Record` par `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Mettez à jour la signature de la méthode du gestionnaire de messages.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Voici un exemple mis à jour de spécification de gestionnaire de messages dans KCL 2.x :

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Pour plus d’informations sur la migration de KCL 1.x vers KCL 2.x, voir la rubrique [Migration des consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html).

### Migration des fournisseurs AWS d'informations d'identification du AWS SDK 1.x vers le SDK 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Les fournisseurs d'informations d'identification sont utilisés pour obtenir des AWS informations d'identification pour les interactions avec AWS. Il existe plusieurs modifications d’interface et de classe liées aux fournisseurs d’informations d’identification dans le SDK 2.x. Ces modifications peuvent être consultées [ici](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). Le connecteur Spark Kinesis a défini une interface (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) et des classes d'implémentation qui renvoient la version 1.x des fournisseurs d'informations d' AWS identification. Ces fournisseurs d’informations d’identification sont nécessaires lors de l’initialisation des clients Kinesis. Par exemple, si vous utilisez cette méthode `SparkAWSCredentials.provider` dans les applications, vous devrez mettre à jour les codes pour utiliser la version 2.x des fournisseurs AWS d'informations d'identification.

Voici un exemple d'utilisation des fournisseurs d'informations d'identification dans le AWS SDK 1.x :

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Pour migrer vers la version 2.x du SDK :**

1. Remplacez l’instruction d’importation `com.amazonaws.auth.AWSCredentialsProvider` par `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Mettez à jour les autres codes qui utilisent cette classe. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migration des clients AWS de service du AWS SDK 1.x vers le SDK 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS les clients de service ont des noms de package différents dans la version 2.x (c'est-à-dire`software.amazon.awssdk`), alors que le SDK 1.x les utilise. `com.amazonaws` Pour plus d’informations sur les modifications apportées aux clients, voir [cette page](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Si vous utilisez ces clients de service dans les codes, vous devez migrer les clients en conséquence.

Voici un exemple de création d’un client dans la version 1.x du SDK :

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Pour migrer vers la version 2.x :**

1. Modifiez les instructions d’importation pour les clients de service. Prenons l’exemple des clients DynamoDB. Remplacez `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` ou `com.amazonaws.services.dynamodbv2.document.DynamoDB` par `software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Mettez à jour les codes qui initialisent les clients.

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Pour plus d'informations sur la migration du AWS SDK de la version 1.x vers la version 2.x, voir [Quelles sont les différences entre le SDK pour AWS Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x et 2.x ?

### Exemples de code pour les applications de streaming
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Considérations relatives à l’utilisation de la version mise à niveau du connecteur Spark Kinesis
<a name="migrating-spark-kinesis-considerations"></a>
+ Si vos applications utilisent la `Kinesis-producer-library` avec une version du JDK antérieure à la version 11, des exceptions peuvent se produire, comme `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`. Cela est dû au fait que EMR 7.0 est fourni avec le JDK 17 par défaut et que les modules J2EE ont été supprimés des bibliothèques standard depuis Java 11. Il est possible de résoudre le problème en ajoutant la dépendance suivante dans le fichier pom. Remplacez la version de la bibliothèque par une autre selon vos besoins.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Le fichier jar du connecteur Spark Kinesis se trouve à l’emplacement suivant après la création d’un cluster EMR : `/usr/lib/spark/connector/lib/`.