

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Kinesis
<a name="emr-kinesis"></a>

亚马逊 EMR 集群可以使用 Hadoop 生态系统中熟悉的工具（例如 Hive、Pig、Hadoop Streaming API 和 Cascading MapReduce）直接读取和处理 Amazon Kinesis 直播。您还可以将 Amazon Kinesis 中的实时数据与正在运行的集群中 Amazon S3、Amazon DynamoDB 和 HDFS 上的现有数据进行连接。您可以直接将 Amazon EMR 中的数据加载到 Amazon S3 或 DynamoDB 来进行后处理。有关 Amazon Kinesis 服务亮点和定价的信息，请参阅 [Amazon Kinesis](https://aws.amazon.com//kinesis) 页面。

## 可以对 Amazon EMR 和 Amazon Kinesis 集成执行哪些操作？
<a name="kinesis-use-cases"></a>

 Amazon EMR 和 Amazon Kinesis 之间的集成使某些方案更简单，例如：
+ **流式处理日志分析**：您可以分析流式处理 Web 日志，以便每隔几分钟按区域、浏览器和访问域生成前 10 个错误类型的列表。
+ **客户参与**：您可以编写查询将 Amazon Kinesis 中的点击流数据与存储在 DynamoDB 表中的广告活动信息进行连接，以确定显示在特定网站上的最有效广告类别。
+ **即席交互式查询**：您可以定期将 Amazon Kinesis 流中的数据加载到 HDFS 中，并以本地 Impala 表的形式提供该数据以进行快速的交互式分析查询。

## 对 Amazon Kinesis 流进行检查点分析
<a name="kinesis-checkpoint"></a>

用户可以定期对 Amazon Kinesis 流进行批量分析，这些分析称为*迭代*。因为使用序列号检索 Amazon Kinesis 流数据记录，所以，可通过 Amazon EMR 在 DynamoDB 表中存储的开始和结束序列号来定义迭代边界。例如，当 `iteration0` 结束时，它在 DynamoDB 中存储结束序列号，这样在 `iteration1` 作业开始时，它可以检索流的后续数据。迭代在流数据中的这种映射称为*检查点操作*。有关更多信息，请参阅 [Kinesis 连接器](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector)。

如果对迭代进行了检查点操作且作业未能处理某个迭代，则 Amazon EMR 会尝试重新处理该迭代中的记录。

通过检查点功能，您可以：
+ 从运行于相同的流和逻辑名称之上的前一个查询处理的序列号之后，开始数据处理
+ 重新处理 Kinesis 中由之前的查询处理的同一批数据

 要启用检查点操作，请在脚本中将 `kinesis.checkpoint.enabled` 参数设置为 `true`。此外，请配置以下参数：


| 配置设置 | 说明 | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | 用于存储检查点信息的 DynamoDB 表名称 | 
| kinesis.checkpoint.metastore.hash.key.name | DynamoDB 表的哈希键名称 | 
| kinesis.checkpoint.metastore.hash.range.name | DynamoDB 表的范围键名称 | 
| kinesis.checkpoint.logical.name | 当前处理的逻辑名称 | 
| kinesis.checkpoint.iteration.no | 与逻辑名称关联的处理的迭代编号 | 
| kinesis.rerun.iteration.without.wait | 用来指示是否可以重新运行失败的迭代而不等待超时的布尔值；默认值为 false | 

### Amazon DynamoDB 表的预置 IOPS 建议
<a name="kinesis-checkpoint-DDB"></a>

Amazon Kinesis 的 Amazon EMR 连接器使用 DynamoDB 数据库作为对元数据进行检查点操作的支持。必须先在 DynamoDB 中创建表，才能以检查点时间间隔使用 Amazon EMR 集群的 Amazon Kinesis 流中的数据。该表必须与 Amazon EMR 集群位于相同区域中。以下是为您应当为 DynamoDB 表预置的 IOPS 数的一般建议；`j` 应当是可同时运行的最大 Hadoop 任务数（具有不同的逻辑名称\$1迭代编号组合），`s` 是任何作业将处理的最大分片数：

对于 **Read Capacity Units**：`j`\$1`s`/`5`

对于 **Write Capacity Units**：`j`\$1`s`

## 性能注意事项
<a name="performance"></a>

Amazon Kinesis 分片吞吐量与 Amazon EMR 集群中节点的实例大小以及流中的记录大小成正比。建议在主节点和核心节点上使用 m5.xlarge 或更大的实例。

## 借助 Amazon EMR 安排 Amazon Kinesis 分析
<a name="schedule"></a>

如果要对活动 Amazon Kinesis 流分析数据，由于任何迭代都受超时和最长持续时间限制，您应经常运行分析，以便从流定期收集详细信息，这十分重要。可以通过多种方式定期执行该类脚本和查询；但建议针对此类周期性任务使用 AWS Data Pipeline 。有关更多信息，请参阅《*AWS Data Pipeline 开发人员指南*》[AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)中的[AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)和。

# 将 Spark Kinesis 连接器迁移到适用于 Amazon EMR 7.0 的 SDK 2.x
<a name="migrating-spark-kinesis"></a>

该 AWS 软件开发工具包提供了一组丰富的库来与 AWS 云计算服务进行交互，例如管理凭据、连接到 S3 和 Kinesis 服务。 APIs Spark Kinesis 连接器用于使用来自 Kinesis Data Streams 的数据，且接收到的数据将在 Spark 的执行引擎中进行转换和处理。目前，此连接器是在 AWS SDK 和 Kinesis-client-library (KCL) 的 1.x 基础上构建的。

作为 AWS SDK 2.x 迁移的一部分，Spark Kinesis 连接器也相应进行了更新，使其可以与 SDK 2.x 一起运行。在 Amazon EMR 7.0 发行版中，Spark 包含 SDK 2.x 升级，该升级尚不可在 Apache Spark 的社区版本中使用。如果您使用低于 7.0 的版本中的 Spark Kinesis 连接器，则必须先将应用程序代码迁移到在 SDK 2.x 上运行，然后才能迁移到 Amazon EMR 7.0。

## 迁移指南
<a name="migrating-spark-kinesis-migration-guides"></a>

本部分介绍将应用程序迁移到升级后的 Spark Kinesis 连接器的步骤。它包括迁移到 Kinesis 客户端库 (KCL) 2.x、 AWS 证书提供程序和 SDK 2.x 中的 AWS 服务客户端的指南。 AWS 作为参考，它还包括一个使用 Kinesis 连接器的示例[WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)程序。

**Topics**
+ [将 KCL 从 1.x 迁移到 2.x](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [将 AWS 凭证提供程序从 AWS SDK 1.x 迁移到 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [将 AWS 服务客户端从 AWS SDK 1.x 迁移到 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [流式传输应用程序的代码示例](#migrating-spark-kinesis-streaming-examples)
+ [使用升级后的 Spark Kinesis 连接器时的注意事项](#migrating-spark-kinesis-considerations)

### 将 KCL 从 1.x 迁移到 2.x
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **`KinesisInputDStream` 中的指标级别和维度**

  当您实例化 `KinesisInputDStream` 时，您可以控制流的指标级别和维度。以下示例演示了如何使用 KCL 1.x 自定义这些参数：

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  在 KCL 2.x 中，这些配置设置具有不同的包名称。要迁移到 2.x：

  1. 分别将 `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` 和 `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` 的导入语句更改为 `software.amazon.kinesis.metrics.MetricsLevel` 和 `software.amazon.kinesis.metrics.MetricsUtil`。

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. 将行 `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` 替换为 `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  以下是包含自定义指标级别和指标维度的 `KinesisInputDStream` 的更新版本：

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ `KinesisInputDStream` 中的消息处理程序函数

  在实例化 `KinesisInputDStream` 时，您还可以提供一个“消息处理程序函数”，该函数接收 Kinesis 记录并返回通用对象 T，以备您想使用记录中包含的其他数据（例如分区键）。

  在 KCL 1.x 中，消息处理程序函数签名为：`Record => T`，其中记录为 `com.amazonaws.services.kinesis.model.Record`。在 KCL 2.x 中，处理程序的签名更改为:`KinesisClientRecord => T`，其中 KinesisClientRecord。`software.amazon.kinesis.retrieval.KinesisClientRecord`

  下面是在 KCL 1.x 中提供消息处理程序的示例：

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  要迁移消息处理程序：

  1. 将 `com.amazonaws.services.kinesis.model.Record` 的导入语句更改为 `software.amazon.kinesis.retrieval.KinesisClientRecord`。

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. 更新消息处理程序的方法签名。

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  下面是在 KCL 2.x 中提供消息处理程序的更新示例：

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  有关从 KCL 1.x 迁移到 2.x 的更多信息，请参阅[将使用者从 KCL 1.x 迁移到 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html)。

### 将 AWS 凭证提供程序从 AWS SDK 1.x 迁移到 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

凭证提供者用于获取与之交互的 AWS 凭证 AWS。SDK 2.x 中有几项与凭证提供程序相关的接口和类更改，可参见[此处](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials)。Spark Kinesis 连接器定义了一个接口 (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) 和实现类，用于返回 1.x 版本的 AWS 凭据提供程序。初始化 Kinesis 客户端时需要这些凭证提供程序。例如，如果您在应用程序`SparkAWSCredentials.provider`中使用该方法，则需要更新代码以使用 2.x 版本的 AWS 凭据提供程序。

以下是在 S AWS DK 1.x 中使用凭证提供程序的示例：

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**要迁移到 SDK 2.x：**

1. 将 `com.amazonaws.auth.AWSCredentialsProvider` 的导入语句更改为 `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. 更新使用此类的其余代码。

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### 将 AWS 服务客户端从 AWS SDK 1.x 迁移到 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS 服务客户端在 2.x（即`software.amazon.awssdk`）中具有不同的软件包名称，而 SDK 1.x 则使用。`com.amazonaws`有关客户端更改的更多信息，请参阅[此处](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)。如果您在代码中使用这些服务客户端，则需要相应地迁移客户端。

下面是在 SDK 1.x 中创建客户端的示例：

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**要迁移到 2.x：**

1. 请更改服务客户端的导入语句。以 DynamoDB 客户端为例。您需要将 `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` 或 `com.amazonaws.services.dynamodbv2.document.DynamoDB` 更改为 `software.amazon.awssdk.services.dynamodb.DynamoDbClient`。

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. 更新用初始化客户端的代码

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   有关将 AWS SDK 从 1.x 迁移到 2.x 的更多信息，请参阅适用于 [Java 的 AWS SDK 1.x 和 2.x 有什么区别](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html)

### 流式传输应用程序的代码示例
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### 使用升级后的 Spark Kinesis 连接器时的注意事项
<a name="migrating-spark-kinesis-considerations"></a>
+ 如果您的应用程序将 `Kinesis-producer-library` 用于低于 11 版本的 JDK，则可能会遇到异常，例如 `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`。之所以发生这种情况，是因为 EMR 7.0 默认附带 JDK 17，而自 Java 11\$1 版本以来，J2EE 模块就已从标准库中移除。此问题可以通过在 pom 文件中添加以下依赖项来解决。将库版本替换为您认为合适的版本。

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ EMR 集群被创建后，可以在此路径下找到 Spark Kinesis 连接器 jar：`/usr/lib/spark/connector/lib/`