

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Migrasi konektor Spark Kinesis ke SDK 2.x untuk Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

 AWS SDK menyediakan seperangkat APIs dan pustaka yang kaya untuk berinteraksi dengan layanan komputasi AWS awan, seperti mengelola kredensil, menghubungkan ke layanan S3 dan Kinesis. Konektor Spark Kinesis digunakan untuk mengkonsumsi data dari Kinesis Data Streams, dan data yang diterima diubah dan diproses di mesin eksekusi Spark. Saat ini konektor ini dibangun di atas 1.x AWS SDK dan Kinesis-client-library (KCL). 

Sebagai bagian dari migrasi AWS SDK 2.x, konektor Spark Kinesis juga diperbarui sesuai untuk dijalankan dengan SDK 2.x. Dalam rilis Amazon EMR 7.0, Spark berisi upgrade SDK 2.x yang belum tersedia di versi komunitas Apache Spark. Jika Anda menggunakan konektor Spark Kinesis dari rilis yang lebih rendah dari 7.0, Anda harus memigrasikan kode aplikasi agar berjalan di SDK 2.x sebelum dapat bermigrasi ke Amazon EMR 7.0.

## Panduan migrasi
<a name="migrating-spark-kinesis-migration-guides"></a>

Bagian ini menjelaskan langkah-langkah untuk memigrasikan aplikasi ke konektor Kinesis Spark yang ditingkatkan. Ini mencakup panduan untuk bermigrasi ke Kinesis Client Library (KCL) 2.x AWS , penyedia kredensi, AWS dan klien layanan di SDK 2.x. AWS Sebagai referensi, ini juga mencakup [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)program sampel yang menggunakan konektor Kinesis.

**Topics**
+ [

### Migrasi KCL dari 1.x ke 2.x
](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [

### Memigrasi penyedia AWS kredensional dari AWS SDK 1.x ke 2.x
](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [

### Memigrasi klien AWS layanan dari AWS SDK 1.x ke 2.x
](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [

### Contoh kode untuk aplikasi streaming
](#migrating-spark-kinesis-streaming-examples)
+ [

### Pertimbangan saat menggunakan konektor Kinesis Spark yang ditingkatkan
](#migrating-spark-kinesis-considerations)

### Migrasi KCL dari 1.x ke 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Tingkat dan dimensi metrik di `KinesisInputDStream`**

  Saat membuat instance`KinesisInputDStream`, Anda dapat mengontrol level dan dimensi metrik untuk aliran. Contoh berikut menunjukkan bagaimana Anda dapat menyesuaikan parameter ini dengan KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  Di KCL 2.x, pengaturan konfigurasi ini memiliki nama paket yang berbeda. Untuk bermigrasi ke 2.x:

  1. Ubah pernyataan impor untuk `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` dan `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` ke `software.amazon.kinesis.metrics.MetricsLevel` dan `software.amazon.kinesis.metrics.MetricsUtil` masing-masing.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Ganti garis `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` dengan `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  Berikut ini adalah versi terbaru dari `KinesisInputDStream` dengan tingkat metrik dan dimensi metrik yang disesuaikan:

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Fungsi penangan pesan di `KinesisInputDStream`

  Saat membuat instance`KinesisInputDStream`, Anda juga dapat menyediakan “fungsi penangan pesan” yang mengambil Rekaman Kinesis dan mengembalikan objek generik T, jika Anda ingin menggunakan data lain yang disertakan dalam Rekaman seperti kunci partisi.

  Di KCL 1.x, tanda tangan fungsi penangan pesan adalah:`Record => T`, di mana Record berada. `com.amazonaws.services.kinesis.model.Record` Di KCL 2.x, tanda tangan handler diubah menjadi:`KinesisClientRecord => T`, where is. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  Berikut ini adalah contoh penyediaan handler pesan di KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Untuk memigrasikan penangan pesan:

  1. Ubah pernyataan impor `com.amazonaws.services.kinesis.model.Record` menjadi`software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Perbarui tanda tangan metode handler pesan.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Berikut ini adalah contoh terbaru dari menyediakan handler pesan di KCL 2.x:

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Untuk informasi selengkapnya tentang migrasi dari KCL 1.x ke 2.x, lihat [Migrasi Konsumen dari KCL 1.x ke KCL](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html) 2.x.

### Memigrasi penyedia AWS kredensional dari AWS SDK 1.x ke 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Penyedia kredensial digunakan untuk mendapatkan AWS kredensil untuk interaksi dengan. AWS[Ada beberapa perubahan antarmuka dan kelas yang terkait dengan penyedia kredensional di SDK 2.x, yang dapat ditemukan di sini.](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials) Konektor Spark Kinesis telah mendefinisikan interface `org.apache.spark.streaming.kinesis.SparkAWSCredentials` () dan kelas implementasi yang mengembalikan versi AWS 1.x dari penyedia kredensi. Penyedia kredensi ini diperlukan saat menginisialisasi klien Kinesis. Misalnya, jika Anda menggunakan metode `SparkAWSCredentials.provider` dalam aplikasi, Anda perlu memperbarui kode untuk menggunakan penyedia AWS kredensi versi 2.x.

Berikut ini adalah contoh penggunaan penyedia kredensi di AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Untuk bermigrasi ke SDK 2.x:**

1. Ubah pernyataan impor `com.amazonaws.auth.AWSCredentialsProvider` menjadi `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Perbarui kode yang tersisa yang menggunakan kelas ini. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Memigrasi klien AWS layanan dari AWS SDK 1.x ke 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS klien layanan memiliki nama paket yang berbeda di 2.x (yaitu`software.amazon.awssdk`). sedangkan SDK 1.x menggunakan. `com.amazonaws` Untuk informasi lebih lanjut tentang perubahan klien, lihat [di sini](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Jika Anda menggunakan klien layanan ini dalam kode, Anda perlu memigrasikan klien yang sesuai.

Berikut ini adalah contoh membuat klien di SDK 1.x:

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Untuk bermigrasi ke 2.x:**

1. Ubah pernyataan impor untuk klien layanan. Ambil klien DynamoDB sebagai contoh. Anda akan perlu untuk mengubah `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` atau `com.amazonaws.services.dynamodbv2.document.DynamoDB` untuk`software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Perbarui kode yang menginisialisasi klien

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Untuk informasi selengkapnya tentang migrasi AWS SDK dari 1.x ke 2.x, lihat [Apa yang berbeda antara SDK for AWS Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x dan 2.x

### Contoh kode untuk aplikasi streaming
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Pertimbangan saat menggunakan konektor Kinesis Spark yang ditingkatkan
<a name="migrating-spark-kinesis-considerations"></a>
+ Jika aplikasi Anda menggunakan versi `Kinesis-producer-library` With JDK lebih rendah dari 11, Anda mungkin mengalami pengecualian seperti. `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter` Ini terjadi karena EMR 7.0 hadir dengan JDK 17 secara default dan modul J2EE telah dihapus dari pustaka standar sejak Java 11\$1. Ini bisa diperbaiki dengan menambahkan ketergantungan berikut dalam file pom. Ganti versi perpustakaan dengan satu sesuai keinginan Anda.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Stoples konektor Spark Kinesis dapat ditemukan di bawah jalur ini setelah cluster EMR dibuat: `/usr/lib/spark/connector/lib/`