

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Conectores e utilitários
<a name="emr-connectors"></a>

O Amazon EMR fornece vários conectores e utilitários para acessar outros AWS serviços como fontes de dados. Você pode geralmente acessar os dados nesses serviços dentro de um programa. Por exemplo, você pode especificar um stream do Kinesis em uma consulta, script do Pig ou MapReduce aplicativo do Hive e, em seguida, operar com esses dados.

**Topics**
+ [Exportar, importar, consultar e unir tabelas no DynamoDB usando o Amazon EMR](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [S3 DistCp (s3-dist-cp)](UsingEMR_s3distcp.md)
+ [Limpando após falhas em trabalhos do S3 DistCp](#s3distcp-cleanup)

# Exportar, importar, consultar e unir tabelas no DynamoDB usando o Amazon EMR
<a name="EMRforDynamoDB"></a>

**nota**  
O Amazon EMR-DynamoDB Connector é de código aberto em. GitHub Para obter mais informações, consulte [https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector).

O DynamoDB é um serviço de banco de dados NoSQL totalmente gerenciado que proporciona um desempenho rápido e previsível com escalabilidade contínua. Os desenvolvedores podem criar uma tabela de banco de dados e ampliar seu tráfego de solicitação ou armazenamento sem limites. O DynamoDB distribui automaticamente dados e tráfego para a tabela através de um número suficiente de servidores, a fim de controlar a capacidade de solicitação especificada pelo cliente e a quantidade de dados armazenados, enquanto mantém um desempenho consistente e rápido. Usando o Amazon EMR e o Hive, você pode processar grandes quantidades de dados com rapidez e eficiência, como os dados armazenados no DynamoDB. Para obter mais informações sobre o DynamoDB, consulte o [Guia do desenvolvedor do Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/).

Apache Hive é uma camada de software que você pode usar para consultar clusters do map-reduce usando uma linguagem simplificada semelhante ao SQL chamada HiveQL. Ele é executado sobre a arquitetura do Hadoop. Para obter mais informações sobre o Hive e o HiveQL, acesse o documento [HiveQL Language Manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual). Para obter mais informações sobre o Hive e o Amazon EMR, consulte [Apache Hive](emr-hive.md).

Você pode usar o Amazon EMR com uma versão personalizada do Hive que inclua conectividade com o DynamoDB para executar operações em dados armazenados no DynamoDB:
+ Carregar dados do DynamoDB no Sistema de Arquivos Distribuído do Hadoop (HDFS) e usá-los como entrada em um cluster do Amazon EMR.
+ Consultar dados dinâmicos do DynamoDB usando instruções semelhantes a SQL (HiveQL).
+ Unir dados armazenados no DynamoDB e exportar ou consultar com base nos dados unidos.
+ Exportar dados armazenados no DynamoDB para o Amazon S3.
+ Importar dados armazenados no Amazon S3 para o DynamoDB.

**nota**  
O conector Amazon EMR-DynamoDB não é compatível com os clusters configurados para usar a [autenticação Kerberos](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html).

Para executar cada uma das seguintes tarefas, será necessário iniciar um cluster do Amazon EMR, especificar a localização dos dados no DynamoDB e emitir comandos do Hive para manipular os dados no DynamoDB. 

Há várias maneiras de iniciar um cluster do Amazon EMR: você pode usar o console do Amazon EMR, a interface de linha de comando (CLI) ou pode programar seu cluster usando um SDK AWS ou a API do Amazon EMR. Você também pode escolher se deseja executar um cluster do Hive interativamente ou a partir de um script. Nesta seção, vamos mostrar a você como iniciar um cluster interativo do Hive pelo console do Amazon EMR e pela CLI. 

Usar o Hive, interativamente, é uma ótima maneira de testar o desempenho das consultas e ajustar seu aplicativo. Depois que você tiver estabelecido um conjunto de comandos do Hive que serão executados de forma regular, considere criar um script do Hive que o Amazon EMR possa executar para você. 

**Atenção**  
As operações de leitura ou de gravação do Amazon EMR em uma tabela do DynamoDB contam em relação ao throughput provisionado estabelecido, o que pode aumentar a frequência de exceções de throughput provisionado. Para grandes solicitações, o Amazon EMR implementa novas tentativas com recuo exponencial para gerenciar a carga de solicitações na tabela do DynamoDB. Executar trabalhos do Amazon EMR simultaneamente com outro tráfego pode fazer com que você exceda o nível de throughput provisionado alocado. Você pode monitorar isso verificando a **ThrottleRequests**métrica na Amazon CloudWatch. Se a carga de solicitações for muito alta, você pode reiniciar o cluster e definir [Configuração de porcentagem de leitura](EMR_Hive_Optimizing.md#ReadPercent) ou [Configuração de porcentagem de gravação](EMR_Hive_Optimizing.md#WritePercent) como um valor menor para limitar as operações do Amazon EMR. Para obter mais informações sobre as configurações de throughput do DynamoDB, consulte [Provisioned throughput](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput).   
Se uma tabela estiver configurada para o [Modo sob demanda](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand), será necessário alterá-la novamente para o modo provisionado antes de executar uma operação de exportação ou importação. Os pipelines precisam de uma taxa de produtividade para calcular os recursos a serem usados em um Dynamo. DBtable O modo sob demanda remove o throughput provisionado. Para provisionar a capacidade de transferência, você pode usar as métricas do Amazon CloudWatch Events para avaliar a taxa de transferência agregada usada por uma tabela.

**Topics**
+ [Configurar uma tabela do Hive para executar comandos do Hive](EMR_Interactive_Hive.md)
+ [Exemplos de comandos do Hive para exportar, importar e consultar dados no DynamoDB](EMR_Hive_Commands.md)
+ [Otimizar a performance de operações do Amazon EMR no DynamoDB](EMR_Hive_Optimizing.md)

# Configurar uma tabela do Hive para executar comandos do Hive
<a name="EMR_Interactive_Hive"></a>

O Apache Hive é uma aplicação de data warehouse que você pode usar para consultar dados contidos em clusters do Amazon EMR usando uma linguagem semelhante ao SQL. Para obter mais informações sobre o Hive, consulte [http://hive.apache.org/](http://hive.apache.org/).

O procedimento a seguir pressupõe que você já tenha criado um cluster e especificado um par de chaves do Amazon EC2. Para saber como começar a criar clusters, consulte [Getting started with Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs) no *Guia de gerenciamento do Amazon EMR*.

## Configurar o Hive para usar MapReduce
<a name="hive-mapreduce"></a>

Quando você usar o Hive no Amazon EMR para consultar tabelas do DynamoDB, poderão ocorrer erros se o Hive usar o mecanismo de execução padrão, o Tez. Por esse motivo, ao criar um cluster com o Hive que se integre ao DynamoDB conforme descrito nesta seção, recomendamos que você use uma classificação de configuração que defina o Hive a ser usado. MapReduce Para obter mais informações, consulte [Configurar aplicações](emr-configure-apps.md).

O trecho a seguir mostra a classificação de configuração e a propriedade a serem usadas para definir MapReduce como mecanismo de execução do Hive:

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**Para executar comandos do Hive interativamente**

1. Conecte-se ao nó principal. Para obter mais informações, consulte [Conectar-se ao nó principal usando SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) no *Guia de gerenciamento do Amazon EMR*.

1. No prompt de comando do nó principal atual, digite `hive`.

   Você verá um prompt do Hive: `hive>`

1.  Insira um comando do Hive que mapeie uma tabela na aplicação Hive para os dados no DynamoDB. Essa tabela serve de referência para os dados armazenados no Amazon DynamoDB; os dados não são armazenados localmente no Hive, e qualquer consulta usando essa tabela é executada com base nos dados dinâmicos no DynamoDB, consumindo a capacidade de leitura ou gravação da tabela sempre que um comando é executado. Se você pretende executar vários comandos do Hive no mesmo conjunto de dados, considere exportá-lo primeiro. 

    O exemplo a seguir mostra a sintaxe para mapear uma tabela do Hive para uma tabela do DynamoDB. 

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    Ao criar uma tabela no Hive pelo DynamoDB, você deve criá-la como uma tabela externa usando a palavra-chave `EXTERNAL`. A diferença entre tabelas externas e internas é que os dados nas tabelas internas são excluídos quando uma tabela interna é descartada. Este não é o comportamento desejado no caso de uma conexão com o Amazon DynamoDB e, portanto, somente tabelas externas são compatíveis. 

    Por exemplo, o seguinte comando do Hive cria uma tabela chamada *hivetable1* no Hive que referencia a tabela do DynamoDB chamada *dynamodbtable1*. A *tabela* dynamodbtable1 do DynamoDB tem um esquema de chave primária. hash-and-range O elemento da chave de hash é `name` (do tipo string), o elemento de chave de intervalo é `year` (do tipo numérico), e cada item possui um valor de atributo para `holidays` (do tipo conjunto de strings). 

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    A linha 1 usa a instrução HiveQL `CREATE EXTERNAL TABLE`. Para *hivetable1*, você precisa estabelecer uma coluna para cada par de nome-valor de atributo na tabela do DynamoDB e fornecer o tipo de dados. Esses valores não diferenciam maiúsculas de minúsculas, e você pode dar qualquer nome (exceto palavras reservadas) para as colunas. 

    A linha 2 usa a instrução `STORED BY`. O valor de `STORED BY` é o nome da classe que manipula a conexão entre o Hive e o DynamoDB. Ele deve ser definido como `'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'`. 

    A linha 3 usa a instrução `TBLPROPERTIES` para associar “hivetable1” com a tabela e o esquema corretos no DynamoDB. Forneça a `TBLPROPERTIES` valores para os parâmetros `dynamodb.table.name` e `dynamodb.column.mapping`. Esses valores *diferenciam* maiúsculas de minúsculas.
**nota**  
 Todos os nomes de atributo do DynamoDB para a tabela devem ter colunas correspondentes na tabela do Hive. Dependendo da sua versão do Amazon EMR, os seguintes cenários ocorrem se o one-to-one mapeamento não existir:  
No Amazon EMR versão 5.27.0 e posterior, o conector tem validações que garantem um mapeamento one-to-one entre nomes de atributos e colunas do DynamoDB na tabela do Hive. Ocorrerá um erro se o one-to-one mapeamento não existir.
Na versão 5.26.0 do Amazon EMR e anteriores, a tabela do Hive não contém o par de nome-valor do DynamoDB. Se você não mapear os atributos de chave primária do DynamoDB, o Hive gerará um erro. Se você não mapear um atributo de chave não primária, nenhum erro será gerado, mas os dados não serão vistos na tabela do Hive. Se os tipos de dados não corresponderem, o valor será nulo. 

Em seguida, você poderá começar a executar operações do Hive em *hivetable1*. As consultas executadas em *hivetable1* são internamente executadas na tabela do DynamoDB *dynamodbtable1* da sua conta do DynamoDB, consumindo unidades de leitura ou gravação com cada execução.

Quando você executa consultas do Hive em uma tabela do DynamoDB, precisa certificar-se de ter provisionado uma quantidade suficiente de unidades de capacidade de leitura.

Por exemplo, suponha que você tenha provisionado 100 unidades de capacidade de leitura para a sua tabela do DynamoDB. Isso permitirá que você realize 100 leituras, ou 409.600 bytes, por segundo. Se essa tabela contiver 20 GB de dados (21.474.836.480 bytes) e sua consulta do Hive realizar uma verificação de tabela completa, você poderá estimar quanto tempo a consulta demorará para ser executada:

 * 21.474.836.480/409.600 = 52.429 segundos = 14,56 horas * 

A única maneira de diminuir o tempo necessário seria ajustar as unidades de capacidade de leitura na tabela DynamoDB de origem. Não adianta adicionar mais nós do Amazon EMR.

Na saída do Hive, a porcentagem de conclusão é atualizada quando um ou mais processos de mapeador são finalizados. Para uma tabela grande do DynamoDB com uma configuração baixa de capacidade de leitura provisionada, a saída de percentual de conclusão pode não ser atualizada por um longo tempo. No caso acima, o trabalho parecerá estar 0% concluído por várias horas. Para ver um status mais detalhado sobre o andamento do trabalho, acesse o console do Amazon EMR. Você poderá visualizar o status de tarefas de mapeador individuais e as estatísticas de leituras de dados. Você também pode fazer logon na interface do Hadoop no nó principal e visualizar as estatísticas do Hadoop. Isso mostrará o status de tarefas map individuais e algumas estatísticas de leitura de dados. Para saber mais, consulte os seguintes tópicos:
+ [Interfaces Web hospedadas no nó principal](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [Visualizar as interfaces Web do Hadoop](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

Para obter mais informações sobre instruções HiveQL de exemplo para realizar tarefas, como exportar ou importar dados do DynamoDB e unir tabelas, consulte [Exemplos de comandos do Hive para exportar, importar e consultar dados no DynamoDB](EMR_Hive_Commands.md).<a name="EMR_Hive_Cancel"></a>

**Para cancelar uma solicitação do Hive**

Quando você executa uma consulta do Hive, a resposta inicial do servidor inclui o comando para cancelar a solicitação. Para cancelar a solicitação a qualquer momento no processo, use **Kill Command (Comando Kill)** na resposta do servidor.

1. Insira `Ctrl+C` para sair do cliente de linha de comando.

1.  No prompt do shell, insira o **Kill Command (Comando Kill)** da resposta do servidor inicial à sua solicitação. 

    Como alternativa, você pode executar o seguinte comando na linha de comando do nó principal para eliminar o trabalho do Hadoop, onde *job-id* é o identificador do trabalho do Hadoop e pode ser recuperado da interface do usuário do Hadoop.

   ```
   hadoop job -kill job-id
   ```

## Tipos de dados para o Hive e o DynamoDB
<a name="EMR_Hive_Properties"></a>

A tabela a seguir mostra os tipos de dados do Hive disponíveis, o tipo do DynamoDB padrão ao qual eles correspondem e os tipos do DynamoDB alternativos para os quais eles também podem mapear. 


| Tipo do Hive | Tipo do DynamoDB padrão | Tipos alternativos do DynamoDB | 
| --- | --- | --- | 
| string | string (S) |  | 
| bigint ou duplo | número (N) |  | 
| binary | binário (B) |  | 
| booleano | booliano (BOOL) |  | 
| array | lista (L) | conjunto de números (NS), conjunto de strings (SS) ou conjunto de binários (BS) | 
| mapa<string, string> | item | mapa (M) | 
| mapa<string,?> | mapa (M) |  | 
|  | nulo (NULL) |  | 

Se você deseja gravar seus dados do Hive como um tipo alternativo correspondente do DynamoDB, ou se os dados do DynamoDB contiverem valores de atributo de um tipo alternativo do DynamoDB, você poderá especificar a coluna e o tipo do DynamoDB usando o parâmetro `dynamodb.type.mapping`. O exemplo a seguir mostra a sintaxe para especificar um tipo alternativo de mapeamento.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

O parâmetro de mapeamento de tipo é opcional e só tem que ser especificado para as colunas que usam tipos alternativos.

Por exemplo, o comando do Hive a seguir cria uma tabela chamada `hivetable2` que referencia tabela `dynamodbtable2` do DynamoDB. Ela é semelhante à `hivetable1`, exceto pelo fato de mapear a coluna `col3` para o tipo de conjunto de strings (SS). 

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

No Hive, `hivetable1` e `hivetable2` são idênticas. No entanto, quando os dados dessas tabelas forem gravados em suas tabelas correspondentes do DynamoDB, a `dynamodbtable1` conterá listas, enquanto a `dynamodbtable2` conterá conjuntos de strings.

Se você quiser gravar valores `null` do Hive como atributos do tipo `null` do DynamoDB, poderá fazer isso usando o parâmetro `dynamodb.null.serialization`. O exemplo a seguir mostra a sintaxe para especificar a serialização `null`.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

O parâmetro de serialização nula é opcional e será definido como `false` se não especificado. Os atributos `null` do DynamoDB são lidos como valores `null` no Hive, independentemente da configuração do parâmetro. As coleções do Hive com valores `null` só poderão ser gravadas no DynamoDB se o parâmetro de serialização nula for especificado como `true`. Caso contrário, ocorrerá um erro do Hive.

O tipo bigint no Hive equivale ao tipo longo no Java, enquanto o tipo duplo no Hive equivale ao tipo duplo no Java em termos de precisão. Isso significa que, se você tiver dados numéricos armazenados no DynamoDB cuja precisão seja maior que a disponível nos tipos de dados do Hive, usar o Hive para exportar, importar ou referenciar os dados do DynamoDB pode resultar na perda de precisão ou em uma falha da consulta do Hive. 

 As exportações do tipo binário do DynamoDB para o Amazon Simple Storage Service (Amazon S3) ou o HDFS são armazenadas como uma string codificada em Base64. Se você estiver importando dados do Amazon S3 ou do HDFS para o tipo binário do DynamoDB, eles deverão ser codificados como uma string Base64. 

## Opções do Hive
<a name="EMR_Hive_Options"></a>

 Você pode definir as seguintes opções do Hive para gerenciar a transferência de dados do Amazon DynamoDB. Essas opções só persistem na sessão atual do Hive. Se você fechar o prompt de comando do Hive e reabri-lo mais tarde no cluster, essas configurações retornarão aos valores padrão. 


| Opções do Hive | Description | 
| --- | --- | 
| dynamodb.throughput.read.percent |   Defina a taxa de operações de leitura para manter sua taxa de throughput provisionado do DynamoDB no intervalo alocado para sua tabela. O valor está entre `0.1` e `1.5`, inclusive.   O valor de 0,5 é a taxa de leitura padrão, que significa que o Hive tentará consumir metade dos recursos de throughput provisionado de leitura na tabela. Aumentar esse valor acima de 0,5 aumenta a taxa de solicitações de leitura. Diminuí-lo abaixo de 0,5 diminui a taxa de solicitações de leitura. Essa taxa de leituras é aproximada. A taxa de leituras real dependerá de fatores como, por exemplo, a existência de distribuição uniforme de chaves no DynamoDB.   Se você perceber que o seu throughput provisionado está sendo frequentemente excedido pela operação do Hive, ou se o tráfego de leituras dinâmicas estiver muito limitado, reduza esse valor abaixo de `0.5`. Se você tiver capacidade suficiente e quiser uma operação do Hive mais rápida, defina esse valor acima de `0.5`. Também é possível configurar o excesso de assinaturas, definindo o valor como até 1.5 se você acredita que existem operações de entrada/saída não utilizadas disponíveis.   | 
| dynamodb.throughput.write.percent |   Defina a taxa de operações de gravação para manter sua taxa de throughput provisionado do DynamoDB no intervalo alocado para sua tabela. O valor está entre `0.1` e `1.5`, inclusive.   O valor de 0,5 é a taxa de gravação padrão, que significa que o Hive tentará consumir metade dos recursos de throughput provisionado de gravação na tabela. Aumentar esse valor acima de 0,5 aumenta a taxa de solicitações de gravação. Diminuí-lo abaixo de 0,5 diminui a taxa de solicitações de gravação. Essa taxa de gravações é aproximada. A taxa de gravações real dependerá de fatores como se há uma distribuição uniforme de chaves no DynamoDB   Se você perceber que o seu throughput provisionado está sendo frequentemente excedido pela operação do Hive, ou se o tráfego de gravações dinâmicas estiver muito limitado, reduza esse valor abaixo de `0.5`. Se você tiver capacidade suficiente e quiser uma operação do Hive mais rápida, defina esse valor acima de `0.5`. Também é possível configurar o excesso de assinaturas, definindo o valor como até 1.5 se você acredita que existem operações de entrada/saída não utilizadas disponíveis ou se este for o upload de dados inicial na tabela e ainda não houver tráfego dinâmico.   | 
| dynamodb.endpoint | Especifique o endpoint para o serviço do DynamoDB. Para obter mais informações sobre os endpoints do DynamoDB disponíveis, consulte [Regions and endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region).  | 
| dynamodb.max.map.tasks |   Especifique o número máximo de tarefas map ao ler dados do DynamoDB. Esse valor deve ser igual ou maior que 1.   | 
| dynamodb.retry.duration |   Especifique o número de minutos a serem usados como a duração do tempo limite para tentar novamente os comandos do Hive. Esse valor deve ser um número inteiro igual a ou maior que 0. A duração do tempo limite padrão é de dois minutos.   | 

 Essas opções são definidas usando o comando `SET`, como mostra o exemplo a seguir. 

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# Exemplos de comandos do Hive para exportar, importar e consultar dados no DynamoDB
<a name="EMR_Hive_Commands"></a>

Os exemplos a seguir usam comandos do Hive para realizar operações, como exportar dados para o Amazon S3 ou HDFS, importar dados para o DynamoDB, unir tabelas, consultar tabelas e muito mais. 

As operações em uma tabela do Hive referenciam dados armazenados no DynamoDB. Os comandos do Hive estão sujeitos às configurações de throughput provisionado da tabela do DynamoDB, e os dados recuperados incluem os dados gravados na tabela do DynamoDB na ocasião em que a solicitação de operação do Hive é processada pelo DynamoDB. Se o processo de recuperação de dados demorar muito, alguns dados retornados pelo comando do Hive podem ter sido atualizados no DynamoDB desde que o comando do Hive foi iniciado. 

Os comandos do Hive `DROP TABLE` e `CREATE TABLE` atuam apenas nas tabelas locais do Hive e não criam nem descartam tabelas no DynamoDB. Se a sua consulta do Hive fizer referência a uma tabela no DynamoDB, essa tabela já deverá existir antes da execução da consulta. Para obter mais informações sobre como criar e excluir tabelas no Amazon DynamoDB, consulte [Working with tables in DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html) no *Guia do desenvolvedor do Amazon DynamoDB*. 

**nota**  
 Quando você mapear uma tabela do Hive para um local no Amazon S3, não a mapeie para o caminho raiz do bucket, s3://amzn-s3-demo-bucket, pois isso poderá causar erros quando o Hive gravar os dados no Amazon S3. Em vez disso, mapeie a tabela para um subcaminho do bucket, s3://amzn-s3-demo-bucket/mypath. 

## Exportar dados do DynamoDB
<a name="EMR_Hive_Commands_exporting"></a>

 Você pode usar o Hive para exportar dados do DynamoDB. 

**Exportar uma tabela do DynamoDB para um bucket do Amazon S3**
+  Crie uma tabela do Hive que faça referência aos dados armazenados no DynamoDB. Em seguida, você poderá chamar o comando INSERT OVERWRITE para gravar os dados em um diretório externo. No exemplo a seguir, *s3://amzn-s3-demo-bucket/path/subpath/* é um caminho válido no Amazon S3. Ajuste as colunas e os tipos de dados no comando CREATE para que eles correspondam aos valores no DynamoDB. Você pode usar isso para criar um arquivamento dos seus dados do DynamoDB no Amazon S3. 

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**Exportar uma tabela do DynamoDB para um bucket do Amazon S3 usando formatação**
+  Crie uma tabela externa que referencie um local no Amazon S3. Isso é mostrado abaixo como s3\$1export. Durante a chamada CREATE, especifique a formatação de linhas para a tabela. Em seguida, quando você usar INSERT OVERWRITE para exportar dados do DynamoDB para o s3\$1export, os dados serão gravados no formato especificado. No exemplo a seguir, os dados são gravados como valores separados por vírgula (CSV). 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**Exportar uma tabela do DynamoDB para um bucket do Amazon S3 sem especificar um mapeamento de colunas**
+  Crie uma tabela do Hive que faça referência aos dados armazenados no DynamoDB. Isso é semelhante ao exemplo anterior, com a diferença de que você não está especificando um mapeamento de colunas. A tabela deve ter exatamente uma coluna do tipo `map<string, string>`. Se você criar uma tabela `EXTERNAL` no Amazon S3, poderá chamar o comando `INSERT OVERWRITE` para gravar os dados do DynamoDB para o Amazon S3. Você pode usar isso para criar um arquivamento dos seus dados do DynamoDB no Amazon S3. Como não há mapeamento de colunas, você não pode consultar tabelas exportadas dessa maneira. A exportação de dados sem especificar um mapeamento de coluna está disponível no Hive 0.8.1.5 ou versões posteriores, que tem suporte na AMI 2.2.*x* e posteriores do Amazon EMR;. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**Exportar uma tabela do DynamoDB para um bucket do Amazon S3 usando a compactação de dados**
+  O Hive oferece vários codecs de codec, que você pode definir durante a sua sessão do Hive. Isso faz com que os dados exportados sejam compactados no formato especificado. O exemplo a seguir compacta os arquivos exportados usando o algoritmo Lempel-Ziv-Oberhumer (LZO). 

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   Os codecs de compactação disponíveis são: 
  +  org.apache.hadoop.io.compress. GzipCodec 
  +  org.apache.hadoop.io.compress. DefaultCodec 
  +  com.hadoop.compression.lzo. LzoCodec 
  +  com.hadoop.compression.lzo. LzopCodec 
  +  org.apache.hadoop.io.compress. BZip2Codec 
  +  org.apache.hadoop.io.compress. SnappyCodec 

**Exportar uma tabela do DynamoDB para HDFS**
+  Use o seguinte comando do Hive, onde *hdfs:///directoryName* está um caminho HDFS válido e uma tabela no Hive *hiveTableName* que faz referência ao DynamoDB. Essa operação de exportação é mais rápida do que exportar uma tabela do DynamoDB para o Amazon S3, pois o Hive 0.7.1.1 usa o HDFS como uma etapa intermediária ao exportar dados para o Amazon S3. O exemplo a seguir também mostra como definir `dynamodb.throughput.read.percent` como 1.0 a fim de aumentar a taxa de solicitação de leitura. 

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   Você também pode exportar dados para o HDFS usando compactação e formatação, conforme mostrado acima para a exportação para o Amazon S3. Para isso, basta substituir o diretório do Amazon S3 nos exemplos acima por um diretório do HDFS. <a name="EMR_Hive_non-printable-utf8"></a>

**Para ler dados de caracteres UTF-8 não imprimíveis no Hive**
+ Você pode ler e gravar dados de caracteres UTF-8 não imprimíveis com o Hive usando a cláusula `STORED AS SEQUENCEFILE` ao criar a tabela. A SequenceFile é o formato de arquivo binário do Hadoop; você precisa usar o Hadoop para ler esse arquivo. O exemplo a seguir mostra como exportar dados do DynamoDB para o Amazon S3. Você pode usar essa funcionalidade para lidar com caracteres de codificação UTF-8 não imprimíveis. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## Importar dados para o DynamoDB
<a name="EMR_Hive_Commands_importing"></a>

 Ao gravar dados no DynamoDB usando o Hive, você deve se certificar de que o número de unidades de capacidade de gravação seja maior do que o número de mapeadores no cluster. Por exemplo, clusters que são executados em instâncias m1.xlarge do EC2 produzem 8 mapeadores por instância. No caso de um cluster que tem 10 instâncias, isso significaria um total de 80 mapeadores. Se as suas unidades de capacidade de gravação não forem maiores que o número de mapeadores no cluster, a operação de gravação do Hive poderá consumir todo o throughput de gravação ou tentar consumir mais throughput do que o provisionado. Para obter mais informações sobre o número de mapeadores produzidos por cada tipo de instância do EC2, consulte [Configurar o Hadoop](emr-hadoop-config.md).

 O número de mapeadores no Hadoop é controlado pelas divisões de entradas. Se houver poucas divisões, seu comando de gravação talvez não seja capaz de consumir todo o throughput de gravação disponível. 

 Se um item com a mesma chave existir na tabela do DynamoDB de destino, ele será substituído. Se houver nenhum item com a chave na tabela do DynamoDB de destino, o item será inserido. 

**Importar uma tabela do Amazon S3 para o DynamoDB**
+  Use o Amazon EMR (Amazon EMR) e o Hive para gravar dados do Amazon S3 no DynamoDB. 

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**Importar uma tabela de um bucket do Amazon S3 para o DynamoDB sem especificar um mapeamento de colunas**
+  Crie uma tabela `EXTERNAL` que faça referência a dados armazenados no Amazon S3 que foram exportados anteriormente do DynamoDB. Antes de importar, verifique se a tabela exista no DynamoDB e tem o mesmo esquema de chave da tabela do DynamoDB exportada anteriormente. Além disso, a tabela deve ter exatamente uma coluna, do tipo `map<string, string>`. Se você criar uma tabela do Hive vinculada ao DynamoDB, poderá chamar o comando `INSERT OVERWRITE` para gravar os dados do Amazon S3 no DynamoDB. Como não há mapeamento de colunas, você não pode consultar tabelas importadas dessa maneira. A importação de dados sem especificar um mapeamento de coluna está disponível no Hive 0.8.1.5 ou versão posterior, que tem suporte na AMI 2.2.3 e posterior do Amazon EMR. 

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**Importar uma tabela do HDFS para o DynamoDB**
+  Você pode usar o Amazon EMR e o Hive para gravar dados do HDFS no DynamoDB. 

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## Consultar dados no DynamoDB
<a name="EMR_Hive_Commands_querying"></a>

 Os exemplos a seguir mostram as várias maneiras de usar o Amazon EMR para consultar dados armazenados no DynamoDB. 

**Para encontrar o maior valor para uma coluna mapeada (`max`)**
+  Use os comandos do Hive como os seguintes. No primeiro comando, a instrução CREATE cria uma tabela do Hive que faz referência aos dados armazenados no DynamoDB. Em seguida, a instrução SELECT usa essa tabela para consultar dados armazenados no DynamoDB. O exemplo a seguir localiza o maior pedido efetuado por um determinado cliente. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**Para agregar dados usando a cláusula `GROUP BY`**
+  Você pode usar a cláusula `GROUP BY` para coletar dados em vários registros. Muitas vezes, isso é usado com uma função agregada, como sum, count, min ou max. O exemplo a seguir gera uma lista dos maiores pedidos de clientes que efetuaram mais de três pedidos. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**Unir duas tabelas do DynamoDB**
+  O exemplo a seguir mapeia duas tabelas do Hive para dados armazenados no DynamoDB. Em seguida, ele chama uma junção entre essas duas tabelas. A junção é calculada no cluster e retornada. A junção não ocorre no DynamoDB. Este exemplo retorna uma lista de clientes e suas compras para clientes que efetuaram mais de dois pedidos. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**Para unir duas tabelas de diferentes origens**
+  No exemplo a seguir, Customer\$1S3 é uma tabela do Hive que carrega um arquivo CSV armazenado no Amazon S3, e hive\$1purchases é uma tabela que referencia dados no DynamoDB. O exemplo a seguir une os dados de clientes armazenados como um arquivo CSV no Amazon S3 com os dados de pedidos armazenados no DynamoDB para retornar um conjunto de dados que representa os pedidos efetuados por clientes cujos nomes incluem “Miller”. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**nota**  
 Nos exemplos anteriores, as instruções CREATE TABLE foram incluídas em cada exemplo para fins de clareza e integridade. Durante a execução de várias consultas ou operações de exportação em uma determinada tabela do Hive, você só precisa criar a tabela uma vez, no início da sessão do Hive. 

# Otimizar a performance de operações do Amazon EMR no DynamoDB
<a name="EMR_Hive_Optimizing"></a>

 As operações do Amazon EMR em uma tabela do DynamoDB contam como operações de leitura e estão sujeitas às configurações de throughput provisionado dessa tabela. O Amazon EMR implementa sua própria lógica para tentar balancear a carga na tabela do DynamoDB a fim de minimizar as chances de que o throughput provisionado seja excedido. No final de cada consulta do Hive, o Amazon EMR retorna informações sobre o cluster usado para processar a consulta, incluindo quantas vezes o throughput provisionado foi excedido. Você pode usar essas informações, bem como CloudWatch métricas sobre a taxa de transferência do DynamoDB, para gerenciar melhor a carga na tabela do DynamoDB em solicitações subsequentes. 

 Os seguintes fatores influenciam a performance de consultas do Hive durante o trabalho com tabelas do DynamoDB. 

## Unidades de capacidade de leitura provisionadas
<a name="ProvisionedReadCapacityUnits"></a>

 Quando você executa consultas do Hive em uma tabela do DynamoDB, precisa certificar-se de ter provisionado uma quantidade suficiente de unidades de capacidade de leitura. 

 Por exemplo, suponha que você tenha provisionado 100 unidades de capacidade de leitura para a sua tabela do DynamoDB. Isso permitirá que você realize 100 leituras, ou 409.600 bytes, por segundo. Se essa tabela contiver 20 GB de dados (21.474.836.480 bytes) e sua consulta do Hive realizar uma verificação de tabela completa, você poderá estimar quanto tempo a consulta demorará para ser executada: 

 * 21.474.836.480/409.600 = 52.429 segundos = 14,56 horas * 

 A única maneira de diminuir o tempo necessário seria ajustar as unidades de capacidade de leitura na tabela DynamoDB de origem. Adicionar mais nós ao cluster do Amazon EMR não vai ajudar. 

 Na saída do Hive, a porcentagem de conclusão é atualizada quando um ou mais processos de mapeador são finalizados. Para uma tabela grande do DynamoDB com uma configuração baixa de Capacidade de leitura provisionada, a saída de percentual de conclusão pode não ser atualizada por um longo tempo. No caso acima, o trabalho parecerá estar 0% concluído por várias horas. Para ver um status mais detalhado sobre o andamento do trabalho, acesse o console do Amazon EMR. Você poderá visualizar o status de tarefas de mapeador individuais e as estatísticas de leituras de dados. 

 Você também pode fazer logon na interface do Hadoop no nó principal e visualizar as estatísticas do Hadoop. Ela mostra o status de tarefas de mapa individuais e algumas estatísticas de leitura de dados. Para obter mais informações, consulte [Web interfaces hosted on the master node](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html) no *Guia de gerenciamento do Amazon EMR*.

## Configuração de porcentagem de leitura
<a name="ReadPercent"></a>

 Por padrão, o Amazon EMR gerencia a carga de solicitações na tabela do DynamoDB de acordo com sua taxa de throughput provisionado atual. No entanto, quando o Amazon EMR retorna informações sobre o trabalho que incluem um alto número de respostas de throughput provisionado excedido, você pode ajustar a taxa de leitura padrão usando o parâmetro `dynamodb.throughput.read.percent` ao configurar a tabela do Hive. Para obter mais informações sobre como configurar o parâmetro de percentual de leitura, consulte [Opções do Hive](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Configuração de porcentagem de gravação
<a name="WritePercent"></a>

 Por padrão, o Amazon EMR gerencia a carga de solicitações na tabela do DynamoDB de acordo com sua taxa de throughput provisionado atual. No entanto, quando o Amazon EMR retorna informações sobre o seu trabalho que incluem um alto número de respostas de throughput provisionado excedido, você pode ajustar a taxa de gravação padrão usando o parâmetro `dynamodb.throughput.write.percent` ao configurar a tabela do Hive. Para obter mais informações sobre como configurar o parâmetro de percentual de gravação, consulte [Opções do Hive](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Configuração de duração da repetição
<a name="emr-ddb-retry-duration"></a>

 Por padrão, o Amazon EMR executa novamente uma consulta do Hive caso ela não tenha retornado um resultado dentro de dois minutos, que é o intervalo de repetição padrão. É possível ajustar esse intervalo definindo o parâmetro `dynamodb.retry.duration` ao executar uma consulta do Hive. Para obter mais informações sobre como configurar o parâmetro de percentual de gravação, consulte [Opções do Hive](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Número de tarefas map
<a name="NumberMapTasks"></a>

 Os daemons mapeadores que o Hadoop executa para processar suas solicitações de exportação e consulta de dados armazenados no DynamoDB estão limitado a uma taxa de leitura máxima de 1 MiB por segundo, para limitar a capacidade de leitura utilizada. Se você tiver throughput provisionado adicional disponível no DynamoDB, poderá melhorar a performance de operações de exportação e consulta do Hive aumentando o número de daemons mapeadores. Para fazer isso, é possível aumentar o número de instâncias do EC2 no cluster *ou* aumentar o número de daemons mapeadores em execução em cada instância do EC2. 

 É possível aumentar o número de instâncias do EC2 em um cluster interrompendo o cluster atual e executando-o novamente com um número maior de instâncias do EC2. Você especifica o número de instâncias do EC2 na caixa de diálogo **Configurar instâncias do EC2** ao executar o cluster no console do Amazon EMR ou com a opção `‑‑num-instances` ao executar o cluster na CLI. 

 O número de tarefas map executadas em uma instância depende do tipo de instância do EC2. Para obter mais informações sobre os tipos de instâncias do EC2 com suporte e o número de mapeadores que cada uma fornece, acesse [Configuração da tarefa](emr-hadoop-task-config.md). Lá, você encontrará uma seção "Configuração de tarefas" para cada uma das configurações com suporte. 

 Outra maneira de aumentar o número de daemons mapeadores é alterar o parâmetro de configuração `mapreduce.tasktracker.map.tasks.maximum` do Hadoop para um valor mais alto. Isso tem a vantagem de proporcionar mais mapeadores sem aumentar o número ou o tamanho de instâncias do EC2, o que gera uma boa economia de custos. Uma desvantagem é que definir esse valor muito alto pode fazer com que as instâncias do EC2 no seu cluster fiquem sem memória. Para definir `mapreduce.tasktracker.map.tasks.maximum`, inicie o cluster e especifique um valor para `mapreduce.tasktracker.map.tasks.maximum` como uma propriedade da classificação de configuração mapred-site. Isso é mostrado no exemplo a seguir. Para obter mais informações, consulte [Configurar aplicações](emr-configure-apps.md).

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## Solicitações de dados em paralelo
<a name="ParallelDataRequests"></a>

 Várias solicitações de dados, seja de mais de um usuário ou de mais de um aplicativo, para uma única tabela podem esgotar o throughput provisionado de leitura e diminuir o desempenho. 

## Duração do processo
<a name="ProcessDuration"></a>

 A consistência dos dados no DynamoDB depende da ordem de operações de leitura e gravação em cada nó. Embora uma consulta do Hive esteja em andamento, outra aplicação pode carregar novos dados para a tabela do DynamoDB ou modificar ou excluir dados existentes. Nesse caso, os resultados da consulta do Hive podem não refletir as alterações feitas nos dados enquanto a consulta estava em execução. 

## Evitar exceder o throughput
<a name="AvoidExceedingThroughput"></a>

 Ao executar consultas do Hive no DynamoDB, tome cuidado para não exceder seu throughput provisionado, pois isso esgotará a capacidade necessária para chamadas da aplicação para `DynamoDB::Get`. Para garantir que isso não ocorra, você deve monitorar regularmente o volume de leitura e a limitação das chamadas do aplicativo, `DynamoDB::Get` verificando os registros e as métricas de monitoramento na Amazon. CloudWatch 

## Tempo de solicitação
<a name="RequestTime"></a>

 Programa consultas do Hive que acessam uma tabela do Dynamo quando há menor demanda na tabela do DynamoDB melhora a performance. Por exemplo, se a maioria dos usuários do aplicativo morar em São Francisco, você poderá optar por exportar os dados diariamente às 4h. PST, quando a maioria dos usuários estiver dormindo e não atualizando registros no banco de dados do DynamoDB. 

## Tabelas baseadas em tempo
<a name="TimeBasedTables"></a>

 Se os dados estiverem organizados como uma série de tabelas do DynamoDB com base no tempo, como uma tabela por dia, você poderá exportá-los quando a tabela não estiver mais ativa. Você pode usar essa técnica para fazer o backup dos dados para o Amazon S3 de uma forma contínua. 

## Dados arquivados
<a name="ArchivedData"></a>

 Se você planeja executar muitas consultas Hive nos dados armazenados no DynamoDB e sua aplicação pode tolerar dados arquivados, talvez queira exportar esses dados para o HDFS ou o Amazon S3 e executar consultas Hive em uma cópia dos dados em vez de no DynamoDB. Isto conserva suas operações de leitura e o throughput provisionado. 

# Kinesis
<a name="emr-kinesis"></a>

Os clusters do Amazon EMR podem ler e processar streams do Amazon Kinesis diretamente, usando ferramentas conhecidas no ecossistema Hadoop, como Hive, Pig MapReduce, Hadoop Streaming API e Cascading. Você também pode unir dados em tempo real do Amazon Kinesis a dados existentes no Amazon S3, no Amazon DynamoDB e no HDFS em um cluster em execução. Você pode carregar diretamente os dados do Amazon EMR no Amazon S3 ou no DynamoDB para atividades de pós-processamento. Para obter informações sobre os destaques e o preço do serviço Amazon Kinesis, consulte a página do [Amazon Kinesis](https://aws.amazon.com//kinesis).

## O que fazer com a integração entre Amazon EMR e o Amazon Kinesis?
<a name="kinesis-use-cases"></a>

 A integração entre o Amazon EMR e o Amazon Kinesis facilita muito determinados cenários. Por exemplo: 
+ **Análise de log de transmissão**: você pode analisar logs Web de transmissão para gerar uma lista dos dez maiores tipos de erros, em intervalos de minutos, por região, navegador e domínio de acesso. 
+ **Envolvimento do cliente**: você pode criar consultas que unam os dados de clickstream do Amazon Kinesis com as informações de campanhas publicitárias armazenadas em uma tabela do DynamoDB para identificar as mais eficientes categorias de anúncios que são exibidos em determinados sites. 
+ **Consultas interativas ad hoc**: você pode, de tempos em tempos, carregar os dados dos fluxos do Amazon Kinesis para o HDFS e torná-los disponíveis como uma tabela do Impala para obter consultas rápidas, interativas e analíticas.

## Análise com ponto de verificação de fluxos do Amazon Kinesis
<a name="kinesis-checkpoint"></a>

Os usuários podem executar análises periódicas em lote de fluxos do Amazon Kinesis no que chamados de *iterações*. Como os registros de dados de fluxo do Amazon Kinesis são recuperados usando um número de sequência, os limites de iteração são definidos por números de sequência iniciais e finais que o Amazon EMR armazena em uma tabela do DynamoDB. Por exemplo, quando `iteration0` é encerrado, ele armazena o número de sequência final no DynamoDB, para que, quando o trabalho `iteration1` começar, ele possa recuperar dados subsequentes do fluxo. Esse mapeamento de iterações em dados de stream é chamado de *pontos de verificação*. Para obter mais informações, consulte [Kinesis connector](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector).

Se a iteração foi verificada e o trabalho falhou no processamento da iteração, o Amazon EMR tentará reprocessar os registros da iteração. 

Pontos de verificação são um recurso que permite: 
+ Iniciar o processamento de dados após um número de sequência processado por uma consulta anterior que foi executada no mesmo stream e nome lógico
+ Reprocessar o mesmo lote de dados do Kinesis que foi processado por uma consulta anterior

 Para habilitar pontos de verificação, defina o parâmetro `kinesis.checkpoint.enabled` como `true` nos seus scripts. Além disso, configure os seguintes parâmetros:


| Definição da configuração | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | Nome da tabela do DynamoDB onde serão armazenadas informações de ponto de verificação | 
| kinesis.checkpoint.metastore.hash.key.name | Nome da chave de hash da tabela do DynamoDB | 
| kinesis.checkpoint.metastore.hash.range.name | Nome da chave de intervalo da tabela do DynamoDB | 
| kinesis.checkpoint.logical.name | Um nome lógico para o processamento atual | 
| kinesis.checkpoint.iteration.no | O número de iterações para o processamento associado ao nome lógico | 
| kinesis.rerun.iteration.without.wait | Valor booliano que indica se uma iteração com falha pode ser executada novamente sem esperar o tempo limite; o padrão é false | 

### Recomendações de IOPS provisionadas para tabelas do Amazon DynamoDB
<a name="kinesis-checkpoint-DDB"></a>

O conector do Amazon EMR para Amazon Kinesis usa o banco de dados DynamoDB como suporte para metadados de pontos de verificação. Você deve criar uma tabela no DynamoDB antes de consumir dados em um fluxo do Amazon Kinesis com um cluster do Amazon EMR em intervalos com ponto de verificação. A tabela deve estar na mesma região que o cluster do Amazon EMR. Veja a seguir as recomendações gerais para o número de IOPS que você deve provisionar para as suas tabelas do DynamoDB; deixe que `j` seja o número máximo de trabalhos do Hadoop (com uma combinação diferente de nome lógico \$1 número de iteração) que podem ser executados simultaneamente e deixe que `s` seja o número máximo de fragmentos que qualquer trabalho processará:

Para **Read Capacity Units (Unidades de capacidade de leitura)**: `j`\$1`s`/`5`

Para **Write Capacity Units (Unidades de capacidade de gravação)**: `j`\$1`s`

## Considerações sobre a performance
<a name="performance"></a>

O throughput de fragmento do Amazon Kinesis é diretamente proporcional ao tamanho da instância de nós em clusters do Amazon EMR e ao tamanho do registro no fluxo. Recomendamos que você use m5.xlarge ou instâncias maiores no nó principal e nos nós centrais.

## Agendar a análise do Amazon Kinesis com o Amazon EMR
<a name="schedule"></a>

Quando você estiver analisando dados em um fluxo do Amazon Kinesis ativo, limitado por tempos limite e uma duração máxima para qualquer iteração, será importante executar sempre a análise para coletar detalhes periódicos do fluxo. Existem várias maneiras de executar esses scripts e consultas em intervalos periódicos; recomendamos usar AWS Data Pipeline em tarefas recorrentes como essas. Para obter mais informações, consulte [AWS Data Pipeline PigActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)e [AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)no *Guia do AWS Data Pipeline desenvolvedor*.

# Migração do conector do Spark Kinesis para o SDK 2.x do Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

O AWS SDK fornece um rico conjunto de bibliotecas para interagir com serviços de computação em AWS nuvem, como gerenciamento de credenciais APIs e conexão com os serviços do S3 e do Kinesis. O conector do Spark Kinesis é usado para consumir dados do Kinesis Data Streams, e os dados recebidos são transformados e processados no mecanismo de execução do Spark. Atualmente, esse conector é construído sobre 1.x do AWS SDK e Kinesis-client-library (KCL). 

Como parte da migração do AWS SDK 2.x, o conector Spark Kinesis também é atualizado adequadamente para ser executado com o SDK 2.x. Na versão 7.0 do Amazon EMR, o Spark contém a atualização do SDK 2.x que ainda não está disponível na versão comunitária do Apache Spark. Se você usa o conector do Spark Kinesis de uma versão inferior à 7.0, é necessário migrar os códigos da sua aplicação para execução no SDK 2.x antes de poder migrar para o Amazon EMR 7.0.

## Guias de migração
<a name="migrating-spark-kinesis-migration-guides"></a>

Esta seção descreve as etapas para migrar uma aplicação ao conector atualizado do Spark Kinesis. Ele inclui guias para migrar para a Kinesis Client Library (KCL) 2.x AWS , provedores de credenciais AWS e clientes de serviços no SDK 2.x. AWS Para referência, ele também inclui um [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)programa de amostra que usa o conector Kinesis.

**Topics**
+ [Migração da versão 1.x à 2.x do serviço KCL](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Migração de provedores de AWS credenciais do AWS SDK 1.x para 2.x](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migração de clientes AWS de serviço do AWS SDK 1.x para 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Exemplos de códigos para aplicações de streaming](#migrating-spark-kinesis-streaming-examples)
+ [Considerações ao usar o conector atualizado do Spark Kinesis](#migrating-spark-kinesis-considerations)

### Migração da versão 1.x à 2.x do serviço KCL
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Nível e dimensões das métricas em `KinesisInputDStream`**

  Ao instanciar um `KinesisInputDStream`, você pode controlar o nível e as dimensões das métricas do fluxo. O seguinte exemplo demonstra como personalizar esses parâmetros com a KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  Na KCL 2.x, essas configurações têm nomes de pacotes diferentes. Para migrar à versão 2.x:

  1. Altere as instruções de importação de `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` e `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` para `software.amazon.kinesis.metrics.MetricsLevel` e `software.amazon.kinesis.metrics.MetricsUtil`, respectivamente.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Substitua a linha `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` por `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  A seguir está uma versão atualizada de `KinesisInputDStream` com níveis e dimensões de métricas personalizados.

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Função de manipulador de mensagens em `KinesisInputDStream`

  Ao instanciar um `KinesisInputDStream`, você também pode fornecer uma “função de manipulador de mensagens” que usa um registro do Kinesis e retorna um objeto genérico T, caso queira usar outros dados incluídos em um registro, como a chave de partição.

  Na KCL 1.x, a assinatura da função de manipulador de mensagens é: `Record => T`, com o registro sendo `com.amazonaws.services.kinesis.model.Record`. No KCL 2.x, a assinatura do manipulador é alterada para:`KinesisClientRecord => T`, where is. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  A seguir está um exemplo de fornecimento de um manipulador de mensagens na KCL 1.x.

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Para migrar o manipulador de mensagens:

  1. Altere a instrução de importação de `com.amazonaws.services.kinesis.model.Record` para `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Atualize a assinatura do método do manipulador de mensagens.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  A seguir está um exemplo atualizado de fornecimento do manipulador de mensagens na KCL 2.x.

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Para obter mais informações sobre como migrar da KCL 1.x para a KCL 2.x, consulte [Migração de consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html) .

### Migração de provedores de AWS credenciais do AWS SDK 1.x para 2.x
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Os provedores de credenciais são usados para obter AWS credenciais para interações com. AWS Há várias mudanças de interface e classe relacionadas aos provedores de credenciais na SDK 2.x, que podem ser encontradas [aqui](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). O conector do Spark Kinesis definiu uma interface `org.apache.spark.streaming.kinesis.SparkAWSCredentials` () e classes de implementação que retornam a versão AWS 1.x dos provedores de credenciais. Esses provedores de credenciais são necessários ao inicializar clientes Kinesis. Por exemplo, se você estiver usando o método `SparkAWSCredentials.provider` nos aplicativos, precisará atualizar os códigos para consumir a versão 2.x dos provedores de AWS credenciais.

Veja a seguir um exemplo do uso dos provedores de credenciais no AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Para migrar ao SDK 2.x:**

1. Altere a instrução de importação de `com.amazonaws.auth.AWSCredentialsProvider` para `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Atualize os códigos restantes que usam essa classe. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migração de clientes AWS de serviço do AWS SDK 1.x para 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS clientes de serviço têm nomes de pacotes diferentes em 2.x (ou seja,`software.amazon.awssdk`). enquanto o SDK 1.x usa. `com.amazonaws` Para obter mais informações sobre as alterações de clientes, consulte [aqui](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Se você estiver usando esses clientes de serviços nos códigos, precisará migrá-los adequadamente.

A seguir está um exemplo de criação de um cliente no SDK 1.x.

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Para migrar à versão 2.x:**

1. Altere as instruções de importação dos clientes de serviços. Veja os clientes DynamoDB como exemplo. Você precisaria mudar `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` ou `com.amazonaws.services.dynamodbv2.document.DynamoDB` para `software.amazon.awssdk.services.dynamodb.DynamoDbClient`.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Atualização dos códigos que inicializam os clientes

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Para obter mais informações sobre a migração do AWS SDK de 1.x para 2.x, consulte [Qual é a diferença entre o SDK para AWS Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x e 2.x

### Exemplos de códigos para aplicações de streaming
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Considerações ao usar o conector atualizado do Spark Kinesis
<a name="migrating-spark-kinesis-considerations"></a>
+ Se suas aplicações usam a `Kinesis-producer-library` com uma versão do JDK inferior à 11, você pode se deparar com exceções como `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter`. Isso acontece porque o EMR 7.0 vem com o JDK 17 por padrão e os módulos J2EE foram removidos das bibliotecas padrão desde o Java 11\$1. Isso pode ser corrigido adicionando a dependência a seguir no arquivo pom. Substitua a versão da biblioteca pela dependência que preferir.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ O arquivo jar do conector do Spark Kinesis pode ser encontrado neste caminho após a criação de um cluster do EMR: `/usr/lib/spark/connector/lib/`

# S3 DistCp (s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

O Apache DistCp é uma ferramenta de código aberto que você pode usar para copiar grandes quantidades de dados. *O S3 DistCp* é semelhante DistCp, mas otimizado para trabalhar AWS, especialmente com o Amazon S3. O comando para o S3 DistCp no Amazon EMR versão 4.0 e posterior `s3-dist-cp` é, que você adiciona como uma etapa em um cluster ou na linha de comando. Usando o S3DistCp, você pode copiar com eficiência grandes quantidades de dados do Amazon S3 para o HDFS, onde eles podem ser processados por etapas subsequentes em seu cluster do Amazon EMR. Você também pode usar o S3 DistCp para copiar dados entre buckets do Amazon S3 ou do HDFS para o Amazon S3. O S3 DistCp é mais escalável e eficiente para copiar paralelamente grandes números de objetos entre buckets e contas. AWS 

Para comandos específicos que demonstram a flexibilidade do S3DistCP em cenários do mundo real, consulte [Sete dicas para usar o S3 DistCp](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/) no blog de Big Data. AWS 

Por exemplo DistCp, o S3 DistCp usa MapReduce para copiar de forma distribuída. Ele compartilha a cópia, o tratamento de erros, a recuperação e as tarefas de relatórios com vários servidores. Para obter mais informações sobre o projeto de código DistCp aberto Apache, consulte o [DistCpguia na documentação](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html) do Apache Hadoop.

Se o S3 DistCp não conseguir copiar alguns ou todos os arquivos especificados, a etapa do cluster falhará e retornará um código de erro diferente de zero. Se isso ocorrer, o S3 DistCp não limpa os arquivos parcialmente copiados. 

**Importante**  
O S3 DistCp não é compatível com nomes de bucket do Amazon S3 que contenham o caractere de sublinhado.  
O S3 DistCp não oferece suporte à concatenação de arquivos Parquet. Use PySpark em vez disso. Para obter mais informações, consulte como [concatenar arquivos do Parquet no Amazon EMR](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/).  
Para evitar erros de cópia ao usar o S3DistCP para copiar um único arquivo (em vez de um diretório) do S3 para o HDFS, use o Amazon EMR versão 5.33.0 ou posterior ou o Amazon EMR versão 6.3.0 ou posterior.

## Opções S3 DistCp
<a name="UsingEMR_s3distcp.options"></a>

Embora semelhante DistCp, o S3 DistCp oferece suporte a um conjunto diferente de opções para alterar a forma como ele copia e compacta dados.

Ao chamar o S3DistCp, você pode especificar as opções descritas na tabela a seguir. As opções são adicionadas à etapa, usando-se a lista de argumentos. Exemplos dos DistCp argumentos do S3 são mostrados na tabela a seguir. 


| Opção  | Description  | Obrigatório  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  Local dos dados a serem copiados. Isso pode ser um local do HDFS ou do Amazon S3.  Exemplo: `‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`   O S3 DistCp não é compatível com nomes de bucket do Amazon S3 que contenham o caractere de sublinhado.   | Sim  | 
| ‑‑dest=LOCATION  |  Destino para os dados. Isso pode ser um local do HDFS ou do Amazon S3.  Exemplo: `‑‑dest=hdfs:///output`   O S3 DistCp não é compatível com nomes de bucket do Amazon S3 que contenham o caractere de sublinhado.   | Sim  | 
| ‑‑srcPattern=PATTERN  |  Uma [expressão regular](http://en.wikipedia.org/wiki/Regular_expression) que filtra a operação de cópia para um subconjunto dos dados em `‑‑src`. Se nenhum `‑‑srcPattern` nem `‑‑groupBy` for especificado, todos os dados em `‑‑src` serão copiados em `‑‑dest`.  Se o argumento da expressão regular contiver caracteres especiais, como um asterisco (\$1), a expressão regular ou a string `‑‑args` inteira deverá ser colocada entre aspas simples (').  Exemplo: `‑‑srcPattern=.*daemons.*-hadoop-.*`   | Não  | 
| ‑‑groupBy=PATTERN  |  Uma [expressão regular](http://en.wikipedia.org/wiki/Regular_expression) que faz com que o S3 concatene arquivos que correspondam DistCp à expressão. Por exemplo, você pode usar essa opção para combinar todos os arquivos de log gravados em uma hora em um único arquivo. O nome do arquivo concatenado é o valor correspondido pela expressão regular para o agrupamento.  Parênteses indicam como os arquivos devem ser agrupados, com todos os itens que correspondam à instrução parentética sendo combinados em um único arquivo de saída. Se a expressão regular não incluir uma declaração entre parênteses, o cluster falhará na DistCp etapa S3 e retornará um erro.  Se o argumento da expressão regular contiver caracteres especiais, como um asterisco (\$1), a expressão regular ou a string `‑‑args` inteira deverá ser colocada entre aspas simples (').  Quando `‑‑groupBy` for especificado, somente os arquivos que correspondam ao padrão especificado serão copiados. Não é necessário especificar `‑‑groupBy` e `‑‑srcPattern` ao mesmo tempo.  Exemplo: `‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | Não  | 
| ‑‑targetSize=SIZE  |  O tamanho, em mebibytes (MiB), dos arquivos a serem criados com base na opção `‑‑groupBy`. Esse valor deve ser um número inteiro. Quando `‑‑targetSize` definido, o S3 DistCp tenta corresponder a esse tamanho; o tamanho real dos arquivos copiados pode ser maior ou menor que esse valor. Os trabalhos são agregados com base no tamanho do arquivo de dados, portanto, é possível que o tamanho do arquivo de destino corresponda ao tamanho do arquivo de dados de origem.  Se os arquivos concatenados pelo `‑‑groupBy` forem maiores que o valor do `‑‑targetSize`, eles serão divididos em arquivos de parte e nomeados sequencialmente com um valor numérico anexado ao final. Por exemplo, um arquivo concatenado em `myfile.gz` seria dividido em partes como: `myfile0.gz`, `myfile1.gz` etc.  Exemplo: `‑‑targetSize=2`   | Não  | 
| ‑‑appendToLastFile |  Especifica o comportamento do S3 DistCp ao copiar para arquivos do Amazon S3 para o HDFS que já estão presentes. Ele acrescenta novos dados de arquivos aos arquivos existentes. Se você usar `‑‑appendToLastFile` com `‑‑groupBy`, novos dados serão anexados aos arquivos que correspondam aos mesmos grupos. Essa opção também respeita o comportamento `‑‑targetSize` quando usada com `‑‑groupBy.`  | Não  | 
| ‑‑outputCodec=CODEC  |  Especifica o codec de compactação a ser usado para os arquivos copiados. Isso pode ter os valores: `gzip`, `gz`, `lzo`, `snappy`ou `none`. Você pode usar essa opção, por exemplo, para converter arquivos de entrada compactados com Gzip em arquivos de saída com compactação LZO ou para descompactar os arquivos como parte da operação de cópia. Se você selecionar um codec de saída, o nome do arquivo será anexado com a extensão apropriada (por exemplo, para `gz` e `gzip`, a extensão é `.gz`) Se você não especificar um valor para `‑‑outputCodec`, os arquivos serão copiados sem alterações na compactação.  Exemplo: `‑‑outputCodec=lzo`   | Não  | 
| ‑‑s3ServerSideEncryption  |  Garante que os dados de destino sejam transferidos usando SSL e criptografados automaticamente no Amazon S3 usando AWS uma chave do lado do serviço. Ao recuperar dados usando o S3DistCp, os objetos são automaticamente descriptografados. Se você tentar copiar um objeto não criptografado em um bucket do Amazon S3 que exige criptografia, haverá falha na operação. Para obter mais informações, consulte [Proteger dados com criptografia](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html).  Exemplo: `‑‑s3ServerSideEncryption`   | Não  | 
| ‑‑deleteOnSuccess  |  Se a operação de cópia for bem-sucedida, essa opção fará com que o S3 DistCp exclua os arquivos copiados do local de origem. Isso é útil se você estiver copiando arquivos de saída, como arquivos de log, de um local para outro como uma tarefa programada e não quiser copiar os mesmos arquivos duas vezes.  Exemplo: `‑‑deleteOnSuccess`   | Não  | 
| ‑‑disableMultipartUpload  |  Desativa o uso do multipart upload.  Exemplo: `‑‑disableMultipartUpload`   | Não  | 
| ‑‑multipartUploadChunkSize=SIZE  |  O tamanho, em MiB, de cada parte de um carregamento multiparte do Amazon S3. O S3 DistCp usa upload de várias partes quando copia dados maiores que o. `multipartUploadChunkSize` Para melhorar a performance do trabalho, você pode aumentar o tamanho de cada parte. O tamanho padrão é 128 MiB.  Exemplo: `‑‑multipartUploadChunkSize=1000`   | Não  | 
| ‑‑numberFiles  |  Precede os arquivos de saída com números sequenciais. A contagem inicia em 0, a menos que um valor diferente seja especificado por `‑‑startingIndex`.  Exemplo: `‑‑numberFiles`   | Não  | 
| ‑‑startingIndex=INDEX  |  Usado com `‑‑numberFiles` para especificar o primeiro número na sequência.  Exemplo: `‑‑startingIndex=1`   | Não  | 
| ‑‑outputManifest=FILENAME  |  Cria um arquivo de texto, compactado com Gzip, que contém uma lista de todos os arquivos copiados pelo S3. DistCp  Exemplo: `‑‑outputManifest=manifest-1.gz`   | Não  | 
| ‑‑previousManifest=PATH  |  Lê um arquivo de manifesto que foi criado durante uma chamada anterior para o S3 DistCp usando o `‑‑outputManifest` sinalizador. Quando o `‑‑previousManifest` sinalizador é definido, o S3 DistCp exclui os arquivos listados no manifesto da operação de cópia. Se `‑‑outputManifest` for especificado juntamente com `‑‑previousManifest`, os arquivos listados no manifesto anterior também aparecerão no novo arquivo manifesto, embora os arquivos não sejam copiados.  Exemplo: `‑‑previousManifest=/usr/bin/manifest-1.gz`   | Não  | 
| ‑‑requirePreviousManifest |  Requer um manifesto anterior criado durante uma chamada anterior para o S3DistCp. Se isso for definido como falso, nenhum erro será gerado quando um manifesto anterior não for especificado. O padrão é true.  | Não  | 
| ‑‑copyFromManifest  |  Inverte o comportamento de `‑‑previousManifest` fazer com que o S3 DistCp use o arquivo de manifesto especificado como uma lista de arquivos a serem copiados, em vez de uma lista de arquivos a serem excluídos da cópia.  Exemplo: `‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`   | Não  | 
| ‑‑s3Endpoint=ENDPOINT |  Especifica o endpoint do Amazon S3 a ser usado ao carregar um arquivo. Essa opção define o endpoint para a origem e o destino. Se não estiver definido, o endpoint padrão será `s3.amazonaws.com`. Para obter uma lista dos endpoints do Amazon S3, consulte [Regions and endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region).  Exemplo: `‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`   | Não  | 
| ‑‑storageClass=CLASS |  A classe de armazenamento a ser usada quando o destino é o Amazon S3. Os valores válidos são STANDARD e REDUCED\$1REDUNDANCY. Se essa opção não for especificada, o S3 DistCp tentará preservar a classe de armazenamento. Exemplo: `‑‑storageClass=STANDARD`  | Não  | 
| ‑‑srcPrefixesFile=PATH |  um arquivo de texto no Amazon S3 (s3://), HDFS (hdfs:///) ou sistema de arquivos local (arquivo:/) que contém uma lista de prefixos `src`, um prefixo por linha.  Se `srcPrefixesFile` for fornecido, o S3 não DistCp listará o caminho src. Em vez disso, ele gerará uma lista de fontes como resultado combinado da listagem de todos os prefixos especificados nesse arquivo. O caminho relativo em comparação com o caminho src, em vez desses prefixos, será usado para gerar os caminhos de destino. Se `srcPattern` também for especificado, ele será aplicado à lista de resultados combinados dos prefixos de origem para filtrar ainda mais a entrada. Se `copyFromManifest` for usado, os objetos no manifesto serão copiados e `srcPrefixesFile` será ignorado. Exemplo: `‑‑srcPrefixesFile=PATH`  | Não  | 

Além das opções acima, o S3 DistCp implementa a [interface da ferramenta](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html), o que significa que ela suporta as opções genéricas. 

## Adicionando o S3 DistCp como uma etapa em um cluster
<a name="UsingEMR_s3distcp.step"></a>

Você pode chamar o S3 DistCp adicionando-o como uma etapa em seu cluster. As etapas podem ser adicionadas a um cluster na inicialização ou a um cluster em execução usando-se o console, a CLI ou a API. Os exemplos a seguir demonstram a adição de uma DistCp etapa do S3 a um cluster em execução. Para obter mais informações sobre como adicionar etapas a um cluster, consulte [Submit work to a cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html) no *Guia de gerenciamento do Amazon EMR*.

**Para adicionar uma DistCp etapa do S3 a um cluster em execução usando o AWS CLI**

Para obter mais informações sobre o uso dos comandos do Amazon EMR no AWS CLI, consulte a Referência de [AWS CLI comandos](https://docs.aws.amazon.com/cli/latest/reference/emr).
+ Para adicionar uma etapa a um cluster que chama o S3DistCp, passe os parâmetros que especificam como o S3 DistCp deve realizar a operação de cópia como argumentos. 

  O exemplo a seguir copia logs do daemon do Amazon S3 para `hdfs:///output`. No comando a seguir:
  + `‑‑cluster-id` especifica o cluster
  + `Jar`é a localização do arquivo DistCp JAR do S3. Para ver um exemplo de como executar um comando em um cluster usando command-runner.jar, consulte [Submit a custom JAR step to run a script or command](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples).
  + `Args`é uma lista separada por vírgulas dos pares nome-valor da opção a serem passados para o S3. DistCp Para obter uma lista completa das opções disponíveis, consulte [Opções S3 DistCp](#UsingEMR_s3distcp.options). 

  Para adicionar uma etapa de DistCp cópia do S3 a um cluster em execução, coloque o seguinte em um arquivo JSON salvo no Amazon S3 ou em seu sistema de arquivos local, `myStep.json` como neste exemplo. *j-3GYXXXXXX9IOK*Substitua pelo ID do cluster e *amzn-s3-demo-bucket* substitua pelo nome do bucket do Amazon S3.

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example Copiar arquivos de log do Amazon S3 para o HDFS**  
Este exemplo também ilustra como copiar arquivos de log armazenados em um bucket do Amazon S3 no HDFS, adicionando-se uma etapa a um cluster em execução. Neste exemplo, a opção `‑‑srcPattern` é usada para limitar os dados copiados para os logs do daemon.   
Para copiar os arquivos de log do Amazon S3 para o HDFS usando a opção `‑‑srcPattern`, coloque o seguinte em um arquivo JSON salvo no Amazon S3 ou seu sistema de arquivos local como `myStep.json` para este exemplo. *j-3GYXXXXXX9IOK*Substitua pelo ID do cluster e *amzn-s3-demo-bucket* substitua pelo nome do bucket do Amazon S3.  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## Limpando após falhas em trabalhos do S3 DistCp
<a name="s3distcp-cleanup"></a>

Se o S3 DistCp não puder copiar alguns ou todos os arquivos especificados, a etapa de comando ou cluster falhará e retornará um código de erro diferente de zero. Se isso ocorrer, o S3 DistCp não limpa os arquivos parcialmente copiados. Você deverá excluí-los manualmente.

Os arquivos parcialmente copiados são salvos no `tmp` diretório HDFS em subdiretórios com o identificador exclusivo da tarefa do S3. DistCp Você pode encontrar esse ID na saída padrão do trabalho.

Por exemplo, para um DistCp trabalho do S3 com o ID`4b1c37bb-91af-4391-aaf8-46a6067085a6`, você pode se conectar ao nó principal do cluster e executar o comando a seguir para visualizar os arquivos de saída associados ao trabalho.

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

O comando retorna uma lista de arquivos semelhantes aos seguintes:

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

Você pode executar o comando a seguir para excluir o diretório e todo o conteúdo.

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```