

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR 叢集可使用 Hive、Pig、MapReduce、Hadoop Streaming API 和 Cascading 等 Hadoop 生態系統內常見的工具，直接讀取與處理 Amazon Kinesis 串流。另外，您也可以將 Amazon Kinesis 的即時資料與執行中叢集內的 Amazon S3、Amazon DynamoDB 和 HDFS 上的現有資料互相聯結。您可以直接將資料從 Amazon EMR 載入 Amazon S3 或 DynamoDB，以供後續處理活動使用。如需有關 Amazon Kinesis 服務特色和定價的資訊，請參閱 [Amazon Kinesis](https://aws.amazon.com//kinesis) 頁面。

## 我可以使用 Amazon EMR 和 Amazon Kinesis 整合做什麼？
<a name="kinesis-use-cases"></a>

 在某些情況下，Amazon EMR 與 Amazon Kinesis 之間的整合能讓操作變得更加輕鬆，例如：
+ **分析日誌串流** – 您可以分析 Web 日誌的串流，如此系統每隔幾分鐘便會根據區域、瀏覽器和存取域產生 10 大錯誤類型清單。
+ **客戶參與度** – 您可以撰寫查詢，然後聯結 Amazon Kinesis 的點擊串流資料與儲存在 DynamoDB 資料表內的廣告活動資訊，藉此找出特定網站上成效最佳的廣告類別。
+ **臨機互動式查詢** – 您能夠定期從 Amazon Kinesis 串流將資料載入至 HDFS，並將該資料作為本機 Impala 資料表，以達到快速的互動式分析查詢。

## Amazon Kinesis 串流的檢查點分析
<a name="kinesis-checkpoint"></a>

使用者能定期執行 Amazon Kinesis 串流的批次分析，這稱為*反覆運算*。系統會使用序號來擷取 Amazon Kinesis 串流資料記錄，因此反覆運算範圍是由 Amazon EMR 儲存在 DynamoDB 資料表內的開始和結束序號所定義。例如，當 `iteration0` 結束時，其會將結束序號儲存在 DynamoDB 中。如此一來，`iteration1` 作業開始時，就能從串流中擷取序列資料。在串流資料中，這種疊代映射稱為*設置檢查點*。如需詳細資訊，請參閱 [Kinesis 連接器](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)。

如果對反覆運算設定了檢查點且作業處理反覆運算失敗，則 Amazon EMR 會嘗試重新處理該反覆運算中的記錄。

設置檢查點的功能能夠：
+ 開始處理在上一次在相同串流和邏輯名稱上執行的查詢所處理過的序號後的資料
+ 重新處理 Kinesis 先前由更早的查詢處理過的相同資料批次

 若要啟用設置檢查點功能，請在指令碼中將 `kinesis.checkpoint.enabled` 參數設為 `true`。還可以設定以下參數：


| 組態設定 | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | 將儲存檢查點資訊的 DynamoDB 資料表名稱 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB 資料表的雜湊金鑰名稱 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB 資料表的範圍金鑰名稱 | 
| kinesis.checkpoint.logical.name | 目前處理中的邏輯名稱 | 
| kinesis.checkpoint.iteration.no | 邏輯名稱相關的處理疊代數目 | 
| kinesis.rerun.iteration.without.wait | 布林值，表示是否可以不等待逾時及重新執行失敗的疊代，預設值為 false | 

### 建議 Amazon DynamoDB 資料表使用的佈建 IOPS
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis 的 Amazon EMR 連接器會使用 DynamoDB 資料庫來備份檢查點的中繼資料。您必須在 DynamoDB 內建立資料表，才能在檢查點間隔中以 Amazon EMR 叢集取用 Amazon Kinesis 串流的資料。資料表必須與您的 Amazon EMR 叢集位於相同的區域中。一般而言，建議您參考下方資訊來設定 DynamoDB 資料表佈建的 IOPS 數量。`j` 表示可同時執行的 Hadoop 作業數量上限 (使用不同的邏輯名稱 \$1 反覆運算數目組合)，而 `s` 則是所有作業能處理的碎片最大數量：

若為 **Read Capacity Units (讀取容量單位)**：`j`\$1`s`/`5`

若為 **Write Capacity Units (寫入容量單位)**：`j`\$1`s`

## 效能考量
<a name="performance"></a>

Amazon Kinesis 碎片輸送量會與 Amazon EMR 叢集內的節點執行個體大小和串流中的記錄大小成正比。建議在主節點和核心節點上使用 m5.xlarge 或更大的執行個體。

## 使用 Amazon EMR 排程 Amazon Kinesis 分析
<a name="schedule"></a>

當您分析使用中的 Amazon Kinesis 串流上的資料時，會受到任何反覆運算的逾時和最大期間的限制，因此定期執行分析相當重要，如此才能從串流中定期收集詳細資訊。有很多方式能夠以定期間隔執行這類指令碼和查詢，我們推薦使用 AWS Data Pipeline 處理這類重複的任務。如需詳細資訊，請參閱《AWS Data Pipeline 開發人員指南》**中的 [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html) 和 [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)。

# 將 Spark Kinesis 連接器遷移至適用於 Amazon EMR 7.0 的 SDK 2.x
<a name="migrating-spark-kinesis"></a>

 AWS 開發套件提供一組豐富的 APIs 和程式庫，可與雲端運算服務互動 AWS ，例如管理登入資料、連線至 S3 和 Kinesis 服務。Spark Kinesis 連接器可用於取用 Kinesis Data Streams 中的資料，接收到的資料會在 Spark 的執行引擎中經過轉換和處理。此連接器目前建置在 AWS SDK 和 Kinesis-client-library (KCL) 的 1.x 之上。

作為 AWS SDK 2.x 遷移的一部分，Spark Kinesis 連接器也會隨之更新，以使用 SDK 2.x 執行。在 Amazon EMR 7.0 版本中，Spark 包含尚不可用於 Apache Spark 社群版本中的 SDK 2.x 升級。如果您使用版本低於 7.0 版的 Spark Kinesis 連接器，必須先遷移應用程式程式碼以在 SDK 2.x 上執行，才能遷移到 Amazon EMR 7.0。

## 遷移指南
<a name="migrating-spark-kinesis-migration-guides"></a>

本節說明將應用程式遷移至升級後之 Spark Kinesis 連接器的步驟。它包含遷移至 AWS SDK 2.x 中 Kinesis Client Library (KCL) 2.x、 AWS 憑證提供者 AWS 和服務用戶端的指南。作為參考，其中也包括使用 Kinesis 連接器的 [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala) 範本計畫。

**Topics**
+ [將 KCL 從 1.x 遷移到 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [將 AWS 登入資料提供者從 AWS SDK 1.x 遷移至 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [將 AWS 服務用戶端從 AWS SDK 1.x 遷移至 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [串流應用程式的程式碼範例](#migrating-spark-kinesis-streaming-examples)
+ [使用升級後的 Spark Kinesis 連接器時的考量事項](#migrating-spark-kinesis-considerations)

### 將 KCL 從 1.x 遷移到 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream` 中的指標層級和維度**

  在您具現化 `KinesisInputDStream` 時，可控制串流的指標層級和維度。以下範例示範如何使用 KCL 1.x 自訂這些參數：

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  在 KCL 2.x 中，這些組態設定會有不同的套件名稱。遷移至 2.x：

  1. 分別將 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 和 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` 的匯入陳述式變更為 `software.amazon.kinesis.metrics.MetricsLevel` 和 `software.amazon.kinesis.metrics.MetricsUtil`。

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. 使用 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)` 取代行 `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet`

  以下是具有自訂指標層級和指標維度的 `KinesisInputDStream` 更新版本：

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream` 中的訊息處理常式函數

  在具現化 `KinesisInputDStream` 時，您也可以提供一個取得 Kinesis Record 並傳回一般物件 T 的「訊息處理常式函數」，以防萬一您要使用 Record 中包含的其他資料 (例如分割區索引鍵)。

  在 KCL 1.x 中，訊息處理常式函數簽章是：`Record => T`，其中，Record 是 `com.amazonaws.services.kinesis.model.Record`。在 KCL 2.x 中，處理常式的簽章會變更為：`KinesisClientRecord => T`，其中，KinesisClientRecord 是 `software.amazon.kinesis.retrieval.KinesisClientRecord`。

  以下是在 KCL 1.x 中提供訊息處理常式的範例：

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  若要遷移訊息處理常式：

  1. 將匯入陳述式從 `com.amazonaws.services.kinesis.model.Record` 變更為 `software.amazon.kinesis.retrieval.KinesisClientRecord`。

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 更新訊息處理常式的方法簽章。

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  以下是在 KCL 2.x 中提供訊息處理常式的更新範例：

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  如需有關從 KCL 1.x 遷移至 2.x 的詳細資訊，請參閱[將消費者從 KCL 1.x 遷移到 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)。

### 將 AWS 登入資料提供者從 AWS SDK 1.x 遷移至 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

登入資料提供者用來取得與 互動的 AWS 登入資料 AWS。SDK 2.x 中有數個與憑證提供者相關的介面和類別變更，可在[此處](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)了解。Spark Kinesis 連接器已定義傳回 1.x 版 AWS 登入資料提供者的介面 (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 和實作類別。在初始化 Kinesis 用戶端時會需要這些憑證提供者。例如，如果您`SparkAWSCredentials.provider`在應用程式中使用 方法，則需要更新程式碼以使用 2.x 版的 AWS 登入資料提供者。

以下是在 AWS SDK 1.x 中使用登入資料提供者的範例：

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**若要遷移至 SDK 2.x：**

1. 將匯入陳述式從 `com.amazonaws.auth.AWSCredentialsProvider` 變更為 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 更新使用此類別的剩餘程式碼。

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### 將 AWS 服務用戶端從 AWS SDK 1.x 遷移至 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 服務用戶端在 2.x （即 `software.amazon.awssdk`) 中有不同的套件名稱，而 SDK 1.x 使用 。 `com.amazonaws`如需有關用戶端變更的詳細資訊，請參閱[此處](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)。如果您在程式碼中使用這些服務用戶端，則需相應地遷移用戶端。

以下是在 SDK 1.x 中建立用戶端的範例：

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**遷移至 2.x：**

1. 變更服務用戶端的匯入陳述式。以 DynamoDB 用戶端為例。您需將 `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 或 `com.amazonaws.services.dynamodbv2.document.DynamoDB` 變更為 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`。

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 更新初始化用戶端的程式碼

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   如需將 AWS SDK 從 1.x 遷移至 2.x 的詳細資訊，請參閱[適用於 Java 的 AWS SDK 1.x 和 2.x 之間的差異](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 串流應用程式的程式碼範例
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 使用升級後的 Spark Kinesis 連接器時的考量事項
<a name="migrating-spark-kinesis-considerations"></a>
+ 如果您的應用程式使用 JDK 版本低於 11 的 `Kinesis-producer-library`，則可能會遇到異常情況，如 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`。發生這種情況是因為依預設，EMR 7.0 會搭配 JDK 17，且自 Java 11\$1 起，J2EE 模組已從標準程式庫中移除。這可以藉由在 pom 檔案中新增以下相依性來解決。請使用適合的版本取代程式庫版本。

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ 在建立 EMR 叢集之後，您可在此路徑下找到 Spark Kinesis 連接器 jar：`/usr/lib/spark/connector/lib/`