

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Kinesis
<a name="emr-kinesis"></a>

Os clusters do Amazon EMR podem ler e processar streams do Amazon Kinesis diretamente, usando ferramentas conhecidas no ecossistema Hadoop, como Hive, Pig MapReduce, Hadoop Streaming API e Cascading. Você também pode unir dados em tempo real do Amazon Kinesis a dados existentes no Amazon S3, no Amazon DynamoDB e no HDFS em um cluster em execução. Você pode carregar diretamente os dados do Amazon EMR no Amazon S3 ou no DynamoDB para atividades de pós-processamento. Para obter informações sobre os destaques e o preço do serviço Amazon Kinesis, consulte a página do [Amazon Kinesis](https://aws.amazon.com//kinesis).

## O que fazer com a integração entre Amazon EMR e o Amazon Kinesis?
<a name="kinesis-use-cases"></a>

 A integração entre o Amazon EMR e o Amazon Kinesis facilita muito determinados cenários. Por exemplo: 
+ **Análise de log de transmissão**: você pode analisar logs Web de transmissão para gerar uma lista dos dez maiores tipos de erros, em intervalos de minutos, por região, navegador e domínio de acesso. 
+ **Envolvimento do cliente**: você pode criar consultas que unam os dados de clickstream do Amazon Kinesis com as informações de campanhas publicitárias armazenadas em uma tabela do DynamoDB para identificar as mais eficientes categorias de anúncios que são exibidos em determinados sites. 
+ **Consultas interativas ad hoc**: você pode, de tempos em tempos, carregar os dados dos fluxos do Amazon Kinesis para o HDFS e torná-los disponíveis como uma tabela do Impala para obter consultas rápidas, interativas e analíticas.

## Análise com ponto de verificação de fluxos do Amazon Kinesis
<a name="kinesis-checkpoint"></a>

Os usuários podem executar análises periódicas em lote de fluxos do Amazon Kinesis no que chamados de *iterações*. Como os registros de dados de fluxo do Amazon Kinesis são recuperados usando um número de sequência, os limites de iteração são definidos por números de sequência iniciais e finais que o Amazon EMR armazena em uma tabela do DynamoDB. Por exemplo, quando `iteration0` é encerrado, ele armazena o número de sequência final no DynamoDB, para que, quando o trabalho `iteration1` começar, ele possa recuperar dados subsequentes do fluxo. Esse mapeamento de iterações em dados de stream é chamado de *pontos de verificação*. Para obter mais informações, consulte [Kinesis connector](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector).

Se a iteração foi verificada e o trabalho falhou no processamento da iteração, o Amazon EMR tentará reprocessar os registros da iteração. 

Pontos de verificação são um recurso que permite: 
+ Iniciar o processamento de dados após um número de sequência processado por uma consulta anterior que foi executada no mesmo stream e nome lógico
+ Reprocessar o mesmo lote de dados do Kinesis que foi processado por uma consulta anterior

 Para habilitar pontos de verificação, defina o parâmetro `kinesis.checkpoint.enabled` como `true` nos seus scripts. Além disso, configure os seguintes parâmetros:


| Definição da configuração | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | Nome da tabela do DynamoDB onde serão armazenadas informações de ponto de verificação | 
| kinesis.checkpoint.metastore.hash.key.name | Nome da chave de hash da tabela do DynamoDB | 
| kinesis.checkpoint.metastore.hash.range.name | Nome da chave de intervalo da tabela do DynamoDB | 
| kinesis.checkpoint.logical.name | Um nome lógico para o processamento atual | 
| kinesis.checkpoint.iteration.no | O número de iterações para o processamento associado ao nome lógico | 
| kinesis.rerun.iteration.without.wait | Valor booliano que indica se uma iteração com falha pode ser executada novamente sem esperar o tempo limite; o padrão é false | 

### Recomendações de IOPS provisionadas para tabelas do Amazon DynamoDB
<a name="kinesis-checkpoint-DDB"></a>

O conector do Amazon EMR para Amazon Kinesis usa o banco de dados DynamoDB como suporte para metadados de pontos de verificação. Você deve criar uma tabela no DynamoDB antes de consumir dados em um fluxo do Amazon Kinesis com um cluster do Amazon EMR em intervalos com ponto de verificação. A tabela deve estar na mesma região que o cluster do Amazon EMR. Veja a seguir as recomendações gerais para o número de IOPS que você deve provisionar para as suas tabelas do DynamoDB; deixe que `j` seja o número máximo de trabalhos do Hadoop (com uma combinação diferente de nome lógico \$1 número de iteração) que podem ser executados simultaneamente e deixe que `s` seja o número máximo de fragmentos que qualquer trabalho processará:

Para **Read Capacity Units (Unidades de capacidade de leitura)**: `j`\$1`s`/`5`

Para **Write Capacity Units (Unidades de capacidade de gravação)**: `j`\$1`s`

## Considerações sobre a performance
<a name="performance"></a>

O throughput de fragmento do Amazon Kinesis é diretamente proporcional ao tamanho da instância de nós em clusters do Amazon EMR e ao tamanho do registro no fluxo. Recomendamos que você use m5.xlarge ou instâncias maiores no nó principal e nos nós centrais.

## Agendar a análise do Amazon Kinesis com o Amazon EMR
<a name="schedule"></a>

Quando você estiver analisando dados em um fluxo do Amazon Kinesis ativo, limitado por tempos limite e uma duração máxima para qualquer iteração, será importante executar sempre a análise para coletar detalhes periódicos do fluxo. Existem várias maneiras de executar esses scripts e consultas em intervalos periódicos; recomendamos usar AWS Data Pipeline em tarefas recorrentes como essas. Para obter mais informações, consulte [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)e [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)no *Guia do AWS Data Pipeline desenvolvedor*.

# Migração do conector do Spark Kinesis para o SDK 2.x do Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

O AWS SDK fornece um rico conjunto de bibliotecas para interagir com serviços de computação em AWS nuvem, como gerenciamento de credenciais APIs e conexão com os serviços do S3 e do Kinesis. O conector do Spark Kinesis é usado para consumir dados do Kinesis Data Streams, e os dados recebidos são transformados e processados no mecanismo de execução do Spark. Atualmente, esse conector é construído sobre 1.x do AWS SDK e Kinesis-client-library (KCL). 

Como parte da migração do AWS SDK 2.x, o conector Spark Kinesis também é atualizado adequadamente para ser executado com o SDK 2.x. Na versão 7.0 do Amazon EMR, o Spark contém a atualização do SDK 2.x que ainda não está disponível na versão comunitária do Apache Spark. Se você usa o conector do Spark Kinesis de uma versão inferior à 7.0, é necessário migrar os códigos da sua aplicação para execução no SDK 2.x antes de poder migrar para o Amazon EMR 7.0.

## Guias de migração
<a name="migrating-spark-kinesis-migration-guides"></a>

Esta seção descreve as etapas para migrar uma aplicação ao conector atualizado do Spark Kinesis. Ele inclui guias para migrar para a Kinesis Client Library (KCL) 2.x AWS , provedores de credenciais AWS e clientes de serviços no SDK 2.x. AWS Para referência, ele também inclui um [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)programa de amostra que usa o conector Kinesis.

**Topics**
+ [Migração da versão 1.x à 2.x do serviço KCL](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Migração de provedores de AWS credenciais do AWS SDK 1.x para 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migração de clientes AWS de serviço do AWS SDK 1.x para 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Exemplos de códigos para aplicações de streaming](#migrating-spark-kinesis-streaming-examples)
+ [Considerações ao usar o conector atualizado do Spark Kinesis](#migrating-spark-kinesis-considerations)

### Migração da versão 1.x à 2.x do serviço KCL
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Nível e dimensões das métricas em `KinesisInputDStream`**

  Ao instanciar um `KinesisInputDStream`, você pode controlar o nível e as dimensões das métricas do fluxo. O seguinte exemplo demonstra como personalizar esses parâmetros com a KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  Na KCL 2.x, essas configurações têm nomes de pacotes diferentes. Para migrar à versão 2.x:

  1. Altere as instruções de importação de `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` e `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` para `software.amazon.kinesis.metrics.MetricsLevel` e `software.amazon.kinesis.metrics.MetricsUtil`, respectivamente.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Substitua a linha `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` por `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  A seguir está uma versão atualizada de `KinesisInputDStream` com níveis e dimensões de métricas personalizados.

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Função de manipulador de mensagens em `KinesisInputDStream`

  Ao instanciar um `KinesisInputDStream`, você também pode fornecer uma “função de manipulador de mensagens” que usa um registro do Kinesis e retorna um objeto genérico T, caso queira usar outros dados incluídos em um registro, como a chave de partição.

  Na KCL 1.x, a assinatura da função de manipulador de mensagens é: `Record => T`, com o registro sendo `com.amazonaws.services.kinesis.model.Record`. No KCL 2.x, a assinatura do manipulador é alterada para:`KinesisClientRecord => T`, where is. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  A seguir está um exemplo de fornecimento de um manipulador de mensagens na KCL 1.x.

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Para migrar o manipulador de mensagens:

  1. Altere a instrução de importação de `com.amazonaws.services.kinesis.model.Record` para `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Atualize a assinatura do método do manipulador de mensagens.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  A seguir está um exemplo atualizado de fornecimento do manipulador de mensagens na KCL 2.x.

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Para obter mais informações sobre como migrar da KCL 1.x para a KCL 2.x, consulte [Migração de consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html) .

### Migração de provedores de AWS credenciais do AWS SDK 1.x para 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Os provedores de credenciais são usados para obter AWS credenciais para interações com. AWS Há várias mudanças de interface e classe relacionadas aos provedores de credenciais na SDK 2.x, que podem ser encontradas [aqui](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). O conector do Spark Kinesis definiu uma interface `org.apache.spark.streaming.kinesis.SparkAWSCredentials` () e classes de implementação que retornam a versão AWS 1.x dos provedores de credenciais. Esses provedores de credenciais são necessários ao inicializar clientes Kinesis. Por exemplo, se você estiver usando o método `SparkAWSCredentials.provider` nos aplicativos, precisará atualizar os códigos para consumir a versão 2.x dos provedores de AWS credenciais.

Veja a seguir um exemplo do uso dos provedores de credenciais no AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Para migrar ao SDK 2.x:**

1. Altere a instrução de importação de `com.amazonaws.auth.AWSCredentialsProvider` para `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Atualize os códigos restantes que usam essa classe. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migração de clientes AWS de serviço do AWS SDK 1.x para 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS clientes de serviço têm nomes de pacotes diferentes em 2.x (ou seja,`software.amazon.awssdk`). enquanto o SDK 1.x usa. `com.amazonaws` Para obter mais informações sobre as alterações de clientes, consulte [aqui](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Se você estiver usando esses clientes de serviços nos códigos, precisará migrá-los adequadamente.

A seguir está um exemplo de criação de um cliente no SDK 1.x.

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Para migrar à versão 2.x:**

1. Altere as instruções de importação dos clientes de serviços. Veja os clientes DynamoDB como exemplo. Você precisaria mudar `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` ou `com.amazonaws.services.dynamodbv2.document.DynamoDB` para `software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Atualização dos códigos que inicializam os clientes

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Para obter mais informações sobre a migração do AWS SDK de 1.x para 2.x, consulte [Qual é a diferença entre o SDK para AWS Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x e 2.x

### Exemplos de códigos para aplicações de streaming
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Considerações ao usar o conector atualizado do Spark Kinesis
<a name="migrating-spark-kinesis-considerations"></a>
+ Se suas aplicações usam a `Kinesis-producer-library` com uma versão do JDK inferior à 11, você pode se deparar com exceções como `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`. Isso acontece porque o EMR 7.0 vem com o JDK 17 por padrão e os módulos J2EE foram removidos das bibliotecas padrão desde o Java 11\$1. Isso pode ser corrigido adicionando a dependência a seguir no arquivo pom. Substitua a versão da biblioteca pela dependência que preferir.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ O arquivo jar do conector do Spark Kinesis pode ser encontrado neste caminho após a criação de um cluster do EMR: `/usr/lib/spark/connector/lib/`