

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Spark Kinesis 커넥터를 Amazon EMR 7.0용 SDK 2.x로 마이그레이션
<a name="migrating-spark-kinesis"></a>

 AWS SDK는 자격 증명 관리, S3 및 Kinesis 서비스 연결과 같은 클라우드 컴퓨팅 서비스와 상호 작용할 수 있는 AWS 풍부한 APIs 및 라이브러리 세트를 제공합니다. Spark Kinesis 커넥터는 Kinesis Data Streams의 데이터를 소비하는 데 사용되며 수신된 데이터는 Spark의 실행 엔진에서 변환 및 처리됩니다. 현재이 커넥터는 1.x SDK AWS 및 Kinesis-client-library(KCL)를 기반으로 구축되었습니다.

 AWS SDK 2.x 마이그레이션의 일환으로 Spark Kinesis 커넥터도 SDK 2.x로 실행되도록 적절히 업데이트됩니다. Amazon EMR 7.0 릴리스의 Spark에는 Apache Spark의 커뮤니티 버전에서 아직 사용할 수 없는 SDK 2.x 업그레이드가 포함되어 있습니다. 7.0 미만의 릴리스에서 Spark Kinesis 커넥터를 사용하는 경우 Amazon EMR 7.0으로 마이그레이션하기 전에 SDK 2.x에서 실행되도록 애플리케이션 코드를 마이그레이션해야 합니다.

## 마이그레이션 안내서
<a name="migrating-spark-kinesis-migration-guides"></a>

이 섹션에서는 업그레이드된 Spark Kinesis 커넥터로 애플리케이션을 마이그레이션하는 단계를 설명합니다. 여기에는 AWS SDK 2.x의 Kinesis Client Library(KCL) 2.x, AWS 자격 증명 공급자 및 AWS 서비스 클라이언트로 마이그레이션하는 가이드가 포함되어 있습니다. 참고로 Kinesis 커넥터를 사용하는 샘플 [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) 프로그램도 포함되어 있습니다.

**Topics**
+ [1.x에서 2.x로 KCL 마이그레이션](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [AWS SDK 1.x에서 2.x로 AWS 자격 증명 공급자 마이그레이션](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [AWS SDK 1.x에서 2.x로 AWS 서비스 클라이언트 마이그레이션](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [스트리밍 애플리케이션의 코드 예제](#migrating-spark-kinesis-streaming-examples)
+ [업그레이드된 Spark Kinesis 커넥터 사용 시 고려 사항](#migrating-spark-kinesis-considerations)

### 1.x에서 2.x로 KCL 마이그레이션
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream`의 지표 수준 및 차원**

  `KinesisInputDStream`을 인스턴스화할 때 스트림의 지표 수준 및 차원을 제어할 수 있습니다. 다음 예제는 KCL 1.x로 이러한 파라미터를 사용자 지정하는 방법을 보여줍니다.

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  KCL 2.x에서는 이러한 구성 설정의 패키지 이름이 다릅니다. 2.x로 마이그레이션:

  1. 가져오기 명령문을 각각 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 및 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel`에서 `software.amazon.kinesis.metrics.MetricsLevel` 및 `software.amazon.kinesis.metrics.MetricsUtil`로 변경하세요.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` 줄을 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`로 바꿉니다.

  다음은 지표 수준 및 지표 차원을 사용자 지정한 `KinesisInputDStream`의 업데이트된 버전입니다.

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream`의 메시지 핸들러 함수

  파티션 키와 같이 레코드에 포함된 다른 데이터를 사용하려는 경우 `KinesisInputDStream`를 인스턴스화할 때 Kinesis Record를 사용하여 일반 객체 T를 반환하는 “메시지 핸들러 함수”를 제공할 수도 있습니다.

  KCL 1.x에서 메시지 핸들러 함수 서명은 `Record => T`이고. 여기서 Record는 `com.amazonaws.services.kinesis.model.Record`입니다. KCL 2.x에서는 핸들러의 서명이 `KinesisClientRecord => T`로 변경됩니다. 여기서 KinesisClientRecord는 `software.amazon.kinesis.retrieval.KinesisClientRecord`입니다.

  다음은 KCL 1.x에서 메시지 핸들러를 제공하는 예제입니다.

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  메시지 핸들러를 마이그레이션하는 방법:

  1. `com.amazonaws.services.kinesis.model.Record`의 가져오기 명령문을 `software.amazon.kinesis.retrieval.KinesisClientRecord`로 변경하세요.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 메시지 핸들러의 메서드 서명을 업데이트하세요.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  다음은 KCL 2.x에서 메시지 핸들러를 제공하는 업데이트된 예입니다.

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  KCL 1.x에서 2.x로 마이그레이션에 대한 자세한 내용은 [KCL 1.x에서 KCL 2.x로 소비자 마이그레이션](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)을 참조하세요.

### AWS SDK 1.x에서 2.x로 AWS 자격 증명 공급자 마이그레이션
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

자격 증명 공급자는와 상호 작용하기 위한 AWS 자격 증명을 얻는 데 사용됩니다 AWS. SDK 2.x의 보안 인증 공급자와 관련된 몇 가지 인터페이스 및 클래스 변경 사항은 [여기](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)에서 확인할 수 있습니다. Spark Kinesis 커넥터는 1.x 버전의 AWS 자격 증명 공급자를 반환하는 인터페이스(`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 및 구현 클래스를 정의했습니다. Kinesis 클라이언트를 초기화할 때 이러한 보안 인증 공급자가 필요합니다. 예를 들어 `SparkAWSCredentials.provider` 애플리케이션에서 메서드를 사용하는 경우 2.x 버전의 AWS 자격 증명 공급자를 사용하도록 코드를 업데이트해야 합니다.

다음은 AWS SDK 1.x에서 자격 증명 공급자를 사용하는 예입니다.

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**SDK 2.x로 마이그레이션하는 방법:**

1. `com.amazonaws.auth.AWSCredentialsProvider`의 가져오기 명령문을 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`로 변경

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 이 클래스를 사용하는 나머지 코드를 업데이트하세요.

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### AWS SDK 1.x에서 2.x로 AWS 서비스 클라이언트 마이그레이션
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 서비스 클라이언트는 2.x(예: `software.amazon.awssdk`)에서 패키지 이름이 다른 반면, SDK 1.x는를 사용합니다`com.amazonaws`. 클라이언트 변경에 대한 자세한 내용은 [여기](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)를 참조하세요. 코드에서 이러한 서비스 클라이언트를 사용하는 경우 그에 따라 클라이언트를 마이그레이션해야 합니다.

다음은 SDK 1.x에서 클라이언트를 생성하는 예제입니다.

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**2.x로 마이그레이션:**

1. 서비스 클라이언트의 가져오기 명령문을 변경하세요. DynamoDB 클라이언트를 예로 들어 보겠습니다. `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 또는 `com.amazonaws.services.dynamodbv2.document.DynamoDB`를 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`로 변경해야 할 것입니다.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 클라이언트를 초기화하는 코드를 업데이트하세요.

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

    AWS SDK를 1.x에서 2.x로 마이그레이션하는 방법에 대한 자세한 내용은 [AWS SDK for Java 1.x와 2.x의 차이점을 참조하세요.](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 스트리밍 애플리케이션의 코드 예제
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 업그레이드된 Spark Kinesis 커넥터 사용 시 고려 사항
<a name="migrating-spark-kinesis-considerations"></a>
+ 애플리케이션에서 JDK 버전 11 미만의 `Kinesis-producer-library`를 사용하는 경우 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`와 같은 예외가 발생할 수 있습니다. 이는 EMR 7.0이 기본적으로 JDK 17과 함께 제공되고 Java 11 이상부터 J2EE 모듈이 표준 라이브러리에서 제거되었기 때문에 발생합니다. 이 문제는 pom 파일에 다음 종속성을 추가하여 해결할 수 있습니다. 상황에 맞게 라이브러리 버전을 교체합니다.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Spark Kinesis 커넥터 jar는 EMR 클러스터가 생성된 후 `/usr/lib/spark/connector/lib/` 경로에서 찾을 수 있습니다.