

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR 클러스터는 Hive, Pig, MapReduce, Hadoop Streaming API, Cascading 등 Hadoop 에코시스템에서 익숙한 도구를 사용해 Amazon Kinesis 스트림을 직접 읽고 처리할 수 있습니다. 실행 중인 클러스터에서 Amazon S3, Amazon DynamoDB 및 HDFS의 기존 데이터와 Amazon Kinesis의 실시간 데이터를 조인할 수도 있습니다. 사후 처리 활동을 위해 Amazon EMR에서 Amazon S3 또는 DynamoDB로 데이터를 직접 로드할 수 있습니다. Amazon Kinesis 서비스 주요 기능 및 요금에 대한 자세한 내용은 [Amazon Kinesis](https://aws.amazon.com//kinesis) 페이지를 참조하세요.

## Amazon EMR과 Amazon Kinesis 통합으로 어떤 작업을 수행할 수 있나요?
<a name="kinesis-use-cases"></a>

 다음과 같은 특정 시나리오는 Amazon EMR과 Amazon Kinesis를 통합할 경우 훨씬 쉬워집니다.
+ **스트리밍 로그 분석** - 스트리밍 웹 로그를 분석하여 리전, 브라우저 및 액세스 도메인에 따라 몇 분마다 상위 10개의 오류 유형 목록을 생성할 수 있습니다.
+ **고객 참여** - Amazon Kinesis의 클릭스트림 데이터를 DynamoDB 테이블에 저장된 광고 캠페인 정보와 조인하여 특정 웹 사이트에 표시되는 가장 효과적인 광고 카테고리를 식별하는 쿼리를 작성할 수 있습니다.
+ **애드혹 대화형 쿼리** - Amazon Kinesis 스트림에서 HDFS로 주기적으로 데이터를 로드하고 빠른 대화형 분석 쿼리를 위한 로컬 Impala 테이블로 데이터를 사용하게 할 수 있습니다.

## Amazon Kinesis 스트림의 분석 검사
<a name="kinesis-checkpoint"></a>

사용자는 Amazon Kinesis 스트림에 대해 *반복*이라고 하는 주기적인 배치 분석을 실행할 수 있습니다. Amazon Kinesis 스트림 데이터 레코드는 시퀀스 번호를 사용하여 검색되기 때문에 반복 경계는 Amazon EMR이 DynamoDB 테이블에 저장하는 시작 및 종료 시퀀스 번호로 정의됩니다. 예를 들어, `iteration0`이 끝나면 DynamoDB에 종료 시퀀스 번호를 저장하므로 `iteration1` 작업이 시작되면 스트림에서 후속 데이터를 검색할 수 있습니다. 스트림 데이터의 이 반복 매핑을 *검사*라고 합니다. 자세한 내용은 [Kinesis 커넥터](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)를 참조하세요.

반복을 검사했지만 반복을 처리하는 작업에 실패한 경우 Amazon EMR은 해당 반복의 레코드 재처리를 시도합니다.

검사는 다음을 수행할 수 있는 기능입니다.
+ 동일한 스트림 및 논리 이름에서 실행된 이전 쿼리에서 시퀀스 번호가 처리된 후에 데이터 처리를 시작합니다.
+ 이전 쿼리에서 처리한 Kinesis의 동일한 데이터 배치를 재처리합니다.

 검사를 활성화하려면 스크립트에서 `kinesis.checkpoint.enabled` 파라미터를 `true`로 설정합니다. 또한 다음 파라미터를 구성합니다.


| 구성 설정 | 설명 | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | 검사 정보가 저장되는 DynamoDB 테이블 이름 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB 테이블의 해시 키 이름 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB 테이블의 범위 키 이름 | 
| kinesis.checkpoint.logical.name | 현재 처리의 논리적 이름 | 
| kinesis.checkpoint.iteration.no | 논리적 이름과 연관된 처리의 반복 번호 | 
| kinesis.rerun.iteration.without.wait | 실패한 반복이 시간 초과를 기다리지 않고 다시 실행할 수 있는지 여부를 나타내는 부울 값입니다. 기본값은 false입니다. | 

### Amazon DynamoDB 테이블의 프로비저닝된 IOPS 권장 사항
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis용 Amazon EMR 커넥터는 메타데이터 검사를 위한 백업으로 DynamoDB 데이터베이스를 사용합니다. 검사 간격으로 Amazon EMR 클러스터가 포함된 Amazon Kinesis 스트림의 데이터를 사용하기 전에 DynamoDB에서 테이블을 생성해야 합니다. 테이블은 Amazon EMR 클러스터와 같은 리전에 있어야 합니다. 다음은 DynamoDB 테이블에 대해 프로비저닝해야 하는 IOPS 수에 대한 일반적인 권장사항입니다. `j`는 동시에 실행할 수 있는 Hadoop 작업의 최대 수(다른 논리적 이름\$1반복 수 조합 포함)이고 `s`는 작업이 처리하는 샤드의 최대 수입니다.

**읽기 용량 유닛**: `j`\$1`s`/`5`

**쓰기 용량 유닛**: `j`\$1`s`

## 성능 고려 사항
<a name="performance"></a>

Amazon Kinesis 샤드 처리량은 Amazon EMR 클러스터를 구성하는 노드의 인스턴스 크기 및 스트림의 레코드 크기에 정비례합니다. 프라이머리 및 코어 노드에는 m5.xlarge 또는 더 큰 인스턴스를 사용하는 것이 좋습니다.

## Amazon EMR을 사용하여 Amazon Kinesis 분석 예약
<a name="schedule"></a>

제한 시간과 최대 반복 기간 제한이 있는 활성 Amazon Kinesis 스트림에서 데이터를 분석하는 경우, 분석을 자주 실행하여 스트림에서 정기적으로 세부 정보를 수집해야 합니다. 이러한 스크립트와 쿼리를 주기적으로 실행하는 방법에는 여러 가지가 있지만 이처럼 반복적인 작업에는 AWS Data Pipeline 을 사용하는 것이 좋습니다. 자세한 내용은 *AWS Data Pipeline 개발자 안내서*에서 [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html) 및 [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)를 참조하세요.

# Spark Kinesis 커넥터를 Amazon EMR 7.0용 SDK 2.x로 마이그레이션
<a name="migrating-spark-kinesis"></a>

 AWS SDK는 자격 증명 관리, S3 및 Kinesis 서비스 연결과 같은 클라우드 컴퓨팅 서비스와 상호 작용할 수 있는 AWS 풍부한 APIs 및 라이브러리 세트를 제공합니다. Spark Kinesis 커넥터는 Kinesis Data Streams의 데이터를 소비하는 데 사용되며 수신된 데이터는 Spark의 실행 엔진에서 변환 및 처리됩니다. 현재이 커넥터는 1.x SDK AWS 및 Kinesis-client-library(KCL)를 기반으로 구축되었습니다.

 AWS SDK 2.x 마이그레이션의 일환으로 Spark Kinesis 커넥터도 SDK 2.x로 실행되도록 적절히 업데이트됩니다. Amazon EMR 7.0 릴리스의 Spark에는 Apache Spark의 커뮤니티 버전에서 아직 사용할 수 없는 SDK 2.x 업그레이드가 포함되어 있습니다. 7.0 미만의 릴리스에서 Spark Kinesis 커넥터를 사용하는 경우 Amazon EMR 7.0으로 마이그레이션하기 전에 SDK 2.x에서 실행되도록 애플리케이션 코드를 마이그레이션해야 합니다.

## 마이그레이션 안내서
<a name="migrating-spark-kinesis-migration-guides"></a>

이 섹션에서는 업그레이드된 Spark Kinesis 커넥터로 애플리케이션을 마이그레이션하는 단계를 설명합니다. 여기에는 AWS SDK 2.x의 Kinesis Client Library(KCL) 2.x, AWS 자격 증명 공급자 및 AWS 서비스 클라이언트로 마이그레이션하는 가이드가 포함되어 있습니다. 참고로 Kinesis 커넥터를 사용하는 샘플 [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) 프로그램도 포함되어 있습니다.

**Topics**
+ [1.x에서 2.x로 KCL 마이그레이션](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [AWS SDK 1.x에서 2.x로 AWS 자격 증명 공급자 마이그레이션](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [AWS SDK 1.x에서 2.x로 AWS 서비스 클라이언트 마이그레이션](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [스트리밍 애플리케이션의 코드 예제](#migrating-spark-kinesis-streaming-examples)
+ [업그레이드된 Spark Kinesis 커넥터 사용 시 고려 사항](#migrating-spark-kinesis-considerations)

### 1.x에서 2.x로 KCL 마이그레이션
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream`의 지표 수준 및 차원**

  `KinesisInputDStream`을 인스턴스화할 때 스트림의 지표 수준 및 차원을 제어할 수 있습니다. 다음 예제는 KCL 1.x로 이러한 파라미터를 사용자 지정하는 방법을 보여줍니다.

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  KCL 2.x에서는 이러한 구성 설정의 패키지 이름이 다릅니다. 2.x로 마이그레이션:

  1. 가져오기 명령문을 각각 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 및 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel`에서 `software.amazon.kinesis.metrics.MetricsLevel` 및 `software.amazon.kinesis.metrics.MetricsUtil`로 변경하세요.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` 줄을 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`로 바꿉니다.

  다음은 지표 수준 및 지표 차원을 사용자 지정한 `KinesisInputDStream`의 업데이트된 버전입니다.

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream`의 메시지 핸들러 함수

  파티션 키와 같이 레코드에 포함된 다른 데이터를 사용하려는 경우 `KinesisInputDStream`를 인스턴스화할 때 Kinesis Record를 사용하여 일반 객체 T를 반환하는 “메시지 핸들러 함수”를 제공할 수도 있습니다.

  KCL 1.x에서 메시지 핸들러 함수 서명은 `Record => T`이고. 여기서 Record는 `com.amazonaws.services.kinesis.model.Record`입니다. KCL 2.x에서는 핸들러의 서명이 `KinesisClientRecord => T`로 변경됩니다. 여기서 KinesisClientRecord는 `software.amazon.kinesis.retrieval.KinesisClientRecord`입니다.

  다음은 KCL 1.x에서 메시지 핸들러를 제공하는 예제입니다.

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  메시지 핸들러를 마이그레이션하는 방법:

  1. `com.amazonaws.services.kinesis.model.Record`의 가져오기 명령문을 `software.amazon.kinesis.retrieval.KinesisClientRecord`로 변경하세요.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 메시지 핸들러의 메서드 서명을 업데이트하세요.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  다음은 KCL 2.x에서 메시지 핸들러를 제공하는 업데이트된 예입니다.

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  KCL 1.x에서 2.x로 마이그레이션에 대한 자세한 내용은 [KCL 1.x에서 KCL 2.x로 소비자 마이그레이션](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)을 참조하세요.

### AWS SDK 1.x에서 2.x로 AWS 자격 증명 공급자 마이그레이션
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

자격 증명 공급자는와 상호 작용하기 위한 AWS 자격 증명을 얻는 데 사용됩니다 AWS. SDK 2.x의 보안 인증 공급자와 관련된 몇 가지 인터페이스 및 클래스 변경 사항은 [여기](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)에서 확인할 수 있습니다. Spark Kinesis 커넥터는 1.x 버전의 AWS 자격 증명 공급자를 반환하는 인터페이스(`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 및 구현 클래스를 정의했습니다. Kinesis 클라이언트를 초기화할 때 이러한 보안 인증 공급자가 필요합니다. 예를 들어 `SparkAWSCredentials.provider` 애플리케이션에서 메서드를 사용하는 경우 2.x 버전의 AWS 자격 증명 공급자를 사용하도록 코드를 업데이트해야 합니다.

다음은 AWS SDK 1.x에서 자격 증명 공급자를 사용하는 예입니다.

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**SDK 2.x로 마이그레이션하는 방법:**

1. `com.amazonaws.auth.AWSCredentialsProvider`의 가져오기 명령문을 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`로 변경

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 이 클래스를 사용하는 나머지 코드를 업데이트하세요.

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### AWS SDK 1.x에서 2.x로 AWS 서비스 클라이언트 마이그레이션
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 서비스 클라이언트는 2.x(예: `software.amazon.awssdk`)에서 패키지 이름이 다른 반면, SDK 1.x는를 사용합니다`com.amazonaws`. 클라이언트 변경에 대한 자세한 내용은 [여기](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)를 참조하세요. 코드에서 이러한 서비스 클라이언트를 사용하는 경우 그에 따라 클라이언트를 마이그레이션해야 합니다.

다음은 SDK 1.x에서 클라이언트를 생성하는 예제입니다.

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**2.x로 마이그레이션:**

1. 서비스 클라이언트의 가져오기 명령문을 변경하세요. DynamoDB 클라이언트를 예로 들어 보겠습니다. `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 또는 `com.amazonaws.services.dynamodbv2.document.DynamoDB`를 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`로 변경해야 할 것입니다.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 클라이언트를 초기화하는 코드를 업데이트하세요.

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

    AWS SDK를 1.x에서 2.x로 마이그레이션하는 방법에 대한 자세한 내용은 [AWS SDK for Java 1.x와 2.x의 차이점을 참조하세요.](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 스트리밍 애플리케이션의 코드 예제
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 업그레이드된 Spark Kinesis 커넥터 사용 시 고려 사항
<a name="migrating-spark-kinesis-considerations"></a>
+ 애플리케이션에서 JDK 버전 11 미만의 `Kinesis-producer-library`를 사용하는 경우 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`와 같은 예외가 발생할 수 있습니다. 이는 EMR 7.0이 기본적으로 JDK 17과 함께 제공되고 Java 11 이상부터 J2EE 모듈이 표준 라이브러리에서 제거되었기 때문에 발생합니다. 이 문제는 pom 파일에 다음 종속성을 추가하여 해결할 수 있습니다. 상황에 맞게 라이브러리 버전을 교체합니다.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Spark Kinesis 커넥터 jar는 EMR 클러스터가 생성된 후 `/usr/lib/spark/connector/lib/` 경로에서 찾을 수 있습니다.