

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Migration des Spark-Kinesis-Konnektors zu SDK 2.x für Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

Das AWS SDK bietet eine Vielzahl von Bibliotheken für die Interaktion mit AWS Cloud-Computing-Diensten, z. B. die Verwaltung von Anmeldeinformationen APIs und die Verbindung zu S3- und Kinesis-Diensten. Der Spark-Kinesis-Konnektor wird verwendet, um Daten aus Kinesis Data Streams zu verarbeiten, und die empfangenen Daten werden in der Ausführungs-Engine von Spark transformiert und verarbeitet. Derzeit baut dieser Konnektor auf Version 1.x von AWS SDK und Kinesis-client-library (KCL) auf. 

Im Rahmen der AWS SDK 2.x-Migration wird auch der Spark Kinesis-Connector entsprechend aktualisiert, sodass er mit dem SDK 2.x ausgeführt werden kann. In der Amazon-EMR-Version 7.0 enthält Spark das SDK-2.x-Upgrade, das in der Community-Version von Apache Spark noch nicht verfügbar ist. Wenn Sie den Spark-Kinesis-Konnektor aus einer Version unter 7.0 verwenden, müssen Sie Ihre Anwendungscodes für die Ausführung in SDK 2.x migrieren, bevor Sie zu Amazon EMR 7.0 migrieren können.

## Migrationshandbücher
<a name="migrating-spark-kinesis-migration-guides"></a>

In diesem Abschnitt werden die Schritte zur Migration einer Anwendung zum aktualisierten Spark-Kinesis-Konnektor beschrieben. Es enthält Anleitungen für die Migration zur Kinesis Client Library (KCL) 2.x, AWS Anmeldeinformationsanbieter und AWS Service-Clients in SDK 2.x. AWS Als Referenz enthält es auch ein [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)Beispielprogramm, das den Kinesis-Konnektor verwendet.

**Topics**
+ [KCL von 1.x zu 2.x migrieren](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Anbieter von AWS Anmeldeinformationen von AWS SDK 1.x auf 2.x migrieren](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migration von AWS Service-Clients von AWS SDK 1.x auf 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Codebeispiele für Streaming-Anwendungen](#migrating-spark-kinesis-streaming-examples)
+ [Überlegungen zur Verwendung des aktualisierten Spark-Kinesis-Konnektors](#migrating-spark-kinesis-considerations)

### KCL von 1.x zu 2.x migrieren
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Ebene und Dimensionen der Metriken in `KinesisInputDStream`**

  Wenn Sie einen `KinesisInputDStream` instanziieren, können Sie die Metrikebene und die Dimensionen für den Stream steuern. Das folgende Beispiel zeigt, wie Sie diese Parameter mit KCL 1.x anpassen können:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  In KCL 2.x haben diese Konfigurationseinstellungen andere Paketnamen. Für die Migration zu 2.x:

  1. Ändern Sie die Importanweisungen für `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` und `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` zu `software.amazon.kinesis.metrics.MetricsLevel` bzw. `software.amazon.kinesis.metrics.MetricsUtil`.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Ersetzen Sie die Zeile `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` durch `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  Im Folgenden finden Sie eine aktualisierte Version von `KinesisInputDStream` mit benutzerdefinierten Metrikebene und Metrikdimensionen:

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Meldungshandler-Funktion in `KinesisInputDStream`

  Bei der Instanziierung eines `KinesisInputDStream` können Sie auch eine „Meldungshandler-Funktion“ angeben, die einen Kinesis-Datensatz verwendet und ein generisches Objekt T zurückgibt, falls Sie andere in einem Datensatz enthaltene Daten wie den Partitionsschlüssel verwenden möchten.

  In KCL 1.x lautet die Signatur der Meldungshandler-Funktion: `Record => T`, wobei Record für `com.amazonaws.services.kinesis.model.Record` steht. In KCL 2.x wurde die Signatur des Handlers in:`KinesisClientRecord => T`, where is geändert. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  Es folgt ein Beispiel für die Bereitstellung eines Meldungshandlers in KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Für die Migration des Meldungshandlers:

  1. Ändern Sie die Importanweisung für `com.amazonaws.services.kinesis.model.Record` zu `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Aktualisieren Sie die Methodensignatur des Meldungshandlers.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Es folgt ein aktualisiertes Beispiel für die Bereitstellung eines Meldungshandlers in KCL 2.x:

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Weitere Informationen zur Migration von KCL 1.x zu 2.x finden Sie unter [Migrieren von Verbrauchern von KCL 1.x zu KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html).

### Anbieter von AWS Anmeldeinformationen von AWS SDK 1.x auf 2.x migrieren
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Anbieter von Anmeldeinformationen werden verwendet, um AWS Anmeldeinformationen für Interaktionen mit zu erhalten. AWS In SDK 2.x gibt es mehrere Schnittstellen- und Klassenänderungen im Zusammenhang mit den Anbietern von Anmeldeinformationen, die Sie [hier finden](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). Der Spark Kinesis-Konnektor hat eine Schnittstelle (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) und Implementierungsklassen definiert, die die Version 1.x von AWS Credential Providern zurückgeben. Diese Anbieter von Anmeldeinformationen werden bei der Initialisierung von Kinesis-Clients benötigt. Wenn Sie die Methode beispielsweise `SparkAWSCredentials.provider` in den Anwendungen verwenden, müssten Sie die Codes aktualisieren, um die 2.x-Version der Credential Provider nutzen zu können. AWS 

Im Folgenden finden Sie ein Beispiel für die Verwendung der Anmeldeinformationsanbieter in AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Für die Migration zu 2.x:**

1. Ändern Sie die Importanweisung für `com.amazonaws.auth.AWSCredentialsProvider` zu `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Aktualisieren Sie die verbleibenden Codes, die diese Klasse verwenden. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migration von AWS Service-Clients von AWS SDK 1.x auf 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS Service-Clients haben in 2.x unterschiedliche Paketnamen (d. h.`software.amazon.awssdk`), wohingegen das SDK 1.x verwendet. `com.amazonaws` Weitere Informationen über die Änderungen in dieser Version finden Sie [hier](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Wenn Sie diese Service-Clients in den Codes verwenden, müssten Sie die Clients entsprechend migrieren.

Im Folgenden finden Sie ein Beispiel für die Erstellung eines Clients in SDK 1.x:

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Für die Migration zu 2.x:**

1. Ändern Sie die Importanweisungen für Service-Clients. Nehmen wir als Beispiel DynamoDB-Clients. Sie müssten `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` oder `com.amazonaws.services.dynamodbv2.document.DynamoDB` zu `software.amazon.awssdk.services.dynamodb.DynamoDbClient` ändern.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Die Codes aktualisieren, die die Clients initialisieren

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Weitere Informationen zur Migration des AWS SDK von 1.x auf 2.x finden Sie unter [Was ist der Unterschied zwischen dem AWS SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x und 2.x

### Codebeispiele für Streaming-Anwendungen
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Überlegungen zur Verwendung des aktualisierten Spark-Kinesis-Konnektors
<a name="migrating-spark-kinesis-considerations"></a>
+ Wenn Ihre Anwendungen `Kinesis-producer-library` mit einer JDK-Version unter 11 verwenden, kann es zu Ausnahmen wie `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter` kommen. Dies liegt daran, dass EMR 7.0 standardmäßig mit JDK 17 geliefert wird und J2EE-Module seit Java 11\$1 aus den Standardbibliotheken entfernt wurden. Dies könnte behoben werden, indem die folgende Abhängigkeit zur POM-Datei hinzugefügt wird. Ersetzen Sie die Bibliotheksversion nach Bedarf durch eine passende.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Das Spark-Kinesis-Konnektor-JAR befindet sich nach der Erstellung eines EMR-Clusters unter diesem Pfad: `/usr/lib/spark/connector/lib/`