

# Conexões do Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-home"></a>

É possível usar uma conexão do Kinesis para ler e gravar o Amazon Kinesis Data Streams usando informações armazenadas em uma tabela do Data Catalog ou fornecendo informações para acessar diretamente o fluxo de dados. Você pode ler informações do Kinesis em um Spark DataFrame e depois convertê-las em um Glue DynamicFrame AWS. Você pode gravar DynamicFrames no Kinesis em um formato JSON. Se você acessar diretamente o fluxo de dados, use essas opções para fornecer as informações sobre como acessar o fluxo de dados.

Se você usar `getCatalogSource` ou `create_data_frame_from_catalog` para consumir registros de uma fonte de transmissão do Kinesis, o trabalho tem o banco de dados do catálogo de dados e as informações de nome da tabela, e pode usá-los para obter alguns parâmetros básicos para leitura da fonte de transmissão do Kinesis. Se você usar `getSource`, `getSourceWithFormat`, `createDataFrameFromOptions` ou `create_data_frame_from_options`, será necessário especificar esses parâmetros básicos usando as opções de conexão descritas aqui.

Você pode especificar as opções de conexão para o Kinesis usando os seguintes argumentos para os métodos especificados na classe `GlueContext`.
+ Scala
  + `connectionOptions`: usar com `getSource`, `createDataFrameFromOptions`, `getSink` 
  + `additionalOptions`: usar com `getCatalogSource`, `getCatalogSink`
  + `options`: usar com `getSourceWithFormat`, `getSinkWithFormat`
+ Python
  + `connection_options`: usar com `create_data_frame_from_options`, `write_dynamic_frame_from_options`
  + `additional_options`: usar com `create_data_frame_from_catalog`, `write_dynamic_frame_from_catalog`
  + `options`: usar com `getSource`, `getSink`

Para notas e restrições sobre trabalhos de ETL de streaming, consulte [Notas e restrições sobre ETL de transmissão](add-job-streaming.md#create-job-streaming-restrictions).

## Configurar o Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-configure"></a>

Para ler um fluxo de dados do Kinesis em uma trabalho do AWS Glue Spark, você precisará de alguns pré-requisitos:
+ Se for leitura, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de leitura ao fluxo de dados do Kinesis.
+ Se for gravação, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de gravação ao fluxo de dados do Kinesis.

Em certos casos, você precisará configurar pré-requisitos adicionais:
+ Se o trabalho do AWS Glue estiver configurado com **conexões de rede adicionais** (normalmente para se conectar a outros conjuntos de dados) e uma dessas conexões fornecer **opções de rede** da Amazon VPC, isso direcionará o trabalho para se comunicar pela Amazon VPC. Nesse caso, você também precisará configurar o fluxo de dados do Kinesis para se comunicar pela Amazon VPC. É possível fazer isso criando um endpoint da VPC de interface entre a Amazon VPC e o fluxo de dados do Kinesis. Para obter mais informações, consulte [Using Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com//streams/latest/dev/vpc.html).
+ Ao especificar Amazon Kinesis Data Streams em outra conta, você deve configurar os perfis e políticas para permitir o acesso entre contas. Para obter mais informações, consulte [Exemplo: Ler de uma transmissão do Kinesis em outra conta](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html).

Para obter mais informações sobre pré-requisitos de trabalho de ETL de streaming, consulte [Trabalhos de transmissão de ETL no AWS Glue](add-job-streaming.md).

## Exemplo: ler de fluxos do Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-read"></a>

### Exemplo: ler de fluxos do Kinesis
<a name="section-etl-connect-kinesis-read"></a>

Usado em conjunto com [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Exemplo para fonte de transmissão do Amazon Kinesis:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Exemplo: gravação em streams do Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-write"></a>

### Exemplo: ler de fluxos do Kinesis
<a name="section-etl-connect-kinesis-read"></a>

Usado em conjunto com [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Exemplo para fonte de transmissão do Amazon Kinesis:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Referência de opções de conexão do Kinesis
<a name="aws-glue-programming-etl-connect-kinesis"></a>

Designa opções de conexão para o Amazon Kinesis Data Streams.

Use as seguintes opções de conexão para fontes de dados de transmissão do Kinesis: 
+ `"streamARN"` (obrigatório) usado para leitura/gravação. O ARN do fluxo de dados do Kinesis.
+ `"classification"` (Obrigatório para leitura) Usado para leitura. O formato de arquivo usado pelos dados no registro. Obrigatório, a menos que fornecido por meio do catálogo de dados.
+ `"streamName"` (opcional) usado para leitura. O nome de um fluxo de dados do Kinesis de onde ler. Usado com `endpointUrl`.
+ `"endpointUrl"` (opcional) usado para leitura. Padrão: “https://kinesis.us-east-1.amazonaws.com” O endpoint AWS do stream do Kinesis. Você não precisa alterar isso, a menos que esteja se conectando a uma região especial.
+ `"partitionKey"` (opcional) usado para gravação. A chave de partição do Kinesis usada na produção de registros.
+ `"delimiter"` (opcional) usado para leitura. O separador de valores usado quando a `classification` é CSV. O padrão é "`,`".
+ `"startingPosition"`: (opcional) usado para leitura. A posição inicial no fluxo de dados do Kinesis de onde ler os dados. Os valores possíveis são `"latest"`, `"trim_horizon"`, `"earliest"` ou uma string de timestamp no formato UTC no padrão `yyyy-mm-ddTHH:MM:SSZ` (onde `Z` representa um desvio do fuso horário UTC com \$1/-). Por exemplo: "2023-04-04T08:00:00-04:00"). O valor padrão é `"latest"`. Observação: a string de timestamp no formato UTC para `"startingPosition"` é compatível somente com a versão 4.0 ou posterior do AWS Glue.
+ `"failOnDataLoss"`: (Opcional) Falha na tarefa se algum fragmento ativo estiver ausente ou expirado. O valor padrão é `"false"`.
+ `"awsSTSRoleARN"`: (opcional) usado para leitura/gravação. O nome de recurso da Amazon (ARN) da função a ser assumida com o uso do AWS Security Token Service (AWS STS). Essa função deve ter permissões para descrever ou ler operações de registro para o fluxo de dados do Kinesis. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com `"awsSTSSessionName"`.
+ `"awsSTSSessionName"`: (opcional) usado para leitura/gravação. Um identificador para a sessão que assume a função usando o AWS STS. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com `"awsSTSRoleARN"`.
+ `"awsSTSEndpoint"`: (Opcional) O AWS STS endpoint a ser usado ao se conectar ao Kinesis com uma função assumida. Isso permite usar o AWS STS endpoint regional em uma VPC, o que não é possível com o endpoint global padrão.
+ `"maxFetchTimeInMs"`: (opcional) usado para leitura. O tempo máximo para o executor do trabalho ler registros referentes ao lote atual do fluxo de dados do Kinesis especificado em milissegundos (ms). Várias chamadas de API `GetRecords` podem ser feitas nesse período. O valor padrão é `1000`.
+ `"maxFetchRecordsPerShard"`: (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis por microlote. Observação: o cliente poderá exceder esse limite se o trabalho de streaming já tiver lido registros extras do Kinesis (na mesma chamada get-records). Se `maxFetchRecordsPerShard` precisa ser rigoroso, então precisa ser um múltiplo de `maxRecordPerRead`. O valor padrão é `100000`.
+ `"maxRecordPerRead"`: (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis em cada operação `getRecords`. O valor padrão é `10000`.
+ `"addIdleTimeBetweenReads"`: (opcional) usado para leitura. Adiciona um atraso de tempo entre duas operações `getRecords`. O valor padrão é `"False"`. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior. 
+ `"idleTimeBetweenReadsInMs"`: (opcional) usado para leitura. O atraso mínimo entre duas operações , especificado em ms. O valor padrão é `1000`. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior. 
+ `"describeShardInterval"`: (opcional) usado para leitura. O intervalo de tempo mínimo entre duas chamadas de API `ListShards` para que seu script considere a refragmentação. Para obter mais informações, consulte [Estratégias para refragmentação](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*. O valor padrão é `1s`.
+ `"numRetries"`: (opcional) usado para leitura. O número máximo de novas tentativas para solicitações de API do Kinesis Data Streams. O valor padrão é `3`.
+ `"retryIntervalMs"`: (opcional) usado para leitura. O período de espera (especificado em ms) antes de repetir a chamada da API Kinesis Data Streams. O valor padrão é `1000`.
+ `"maxRetryIntervalMs"`: (opcional) usado para leitura. O período de espera máximo (especificado em ms) entre duas tentativas de uma chamada de API Kinesis Data Streams. O valor padrão é `10000`.
+ `"avoidEmptyBatches"`: (opcional) usado para leitura. Evita a criação de um trabalho de micro lote vazio verificando se há dados não lidos no fluxo de dados do Kinesis antes de o lote ser iniciado. O valor padrão é `"False"`.
+ `"schema"`: (Obrigatório quando inferSchema é definido como false): usado para leitura. O esquema a ser usado para processar a carga. Se a classificação for `avro`, o esquema fornecido deverá estar no formato de esquema Avro. Se a classificação não for `avro`, o esquema fornecido deverá estar no formato de esquema DDL.

  Veja a seguir alguns exemplos de esquema.

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`: (opcional) usado para leitura. O valor padrão é "false". Se definido como “true”, o esquema será detectado em runtime com base na carga útil em `foreachbatch`.
+ `"avroSchema"`: (Obsoleto) Usado para leitura. Parâmetro usado para especificar um esquema de dados Avro quando o formato Avro é usado. Esse parâmetro foi descontinuado. Use o parâmetro `schema`.
+ `"addRecordTimestamp"`: (opcional) usado para leitura. Quando essa opção for definida como "true", a saída de dados conterá uma coluna adicional denominada "\$1\$1src\$1timestamp" que indica a hora que o registro correspondente é recebido pelo fluxo. O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.
+ `"emitConsumerLagMetrics"`: (opcional) usado para leitura. Quando a opção for definida como "true" (verdadeira), para cada lote, serão emitidas métricas durante o período entre a hora que o registro mais antigo é recebido pelo tópico e a hora que ele chega ao AWS Glue para o CloudWatch. O nome da métrica é "glue.driver.streaming.maxConsumerLagInMs". O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.
+ `"fanoutConsumerARN"`: (opcional) usado para leitura. O ARN de um consumidor de fluxo do Kinesis para o fluxo especificado em `streamARN`. Usado para habilitar o modo de distribuição avançada para a conexão do Kinesis. Para obter mais informações sobre como consumir um fluxo do Kinesis com distribuição avançada, consulte [Usar distribuição avançada nas tarefas de streaming do Kinesis](aws-glue-programming-etl-connect-kinesis-efo.md).
+ `"recordMaxBufferedTime"` (opcional) usado para gravação. Padrão: 1000 (ms). Tempo máximo em que um registro é armazenado em buffer enquanto espera para ser gravado.
+ `"aggregationEnabled"` (opcional) usado para gravação. Padrão: true. Especifica se os registros devem ser agregados antes de serem enviados para o Kinesis.
+ `"aggregationMaxSize"` (opcional) usado para gravação. Padrão: 51200 (bytes) Se um registro for maior que esse limite, ele ignorará o agregador. Nota: O Kinesis impõe um limite de 50 KB no tamanho do registro. Se você definir isso além de 50 KB, registros grandes serão rejeitados pelo Kinesis.
+ `"aggregationMaxCount"` (opcional) usado para gravação. Padrão: 4294967295. O número máximo de itens a serem retornados em um registro agregado.
+ `"producerRateLimit"` (opcional) usado para gravação. Padrão: 150 (%). Limita o throughput por fragmento enviado por um único produtor (como seu trabalho), como uma porcentagem do limite de back-end.
+ `"collectionMaxCount"` (opcional) usado para gravação. Padrão: 500. Número máximo de itens a serem compactados em uma solicitação PutRecords. 
+ `"collectionMaxSize"` (opcional) usado para gravação. Padrão: 5242880 (bytes) Quantidade máxima de dados a serem enviados com uma solicitação PutRecords.

# Usar distribuição avançada nas tarefas de streaming do Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-efo"></a>

Um consumidor de distribuição avançada pode receber registros de um fluxo do Kinesis com um throughput dedicado que pode ser maior que o dos consumidores comuns. Isso é feito otimizando o protocolo de transferência usado para fornecer dados a um consumidor do Kinesis, como o seu trabalho. [Para obter mais informações sobre distribuição avançada do Kinesis, consulte a documentação do Kinesis](https://docs.aws.amazon.com//streams/latest/dev/enhanced-consumers.html).

No modo de distribuição avançada, as opções de conexão `maxRecordPerRead` e `idleTimeBetweenReadsInMs` não se aplicam mais, pois esses parâmetros não são configuráveis quando se usa o fan-out aprimorado. As opções de configuração para novas tentativas funcionam conforme descrito.

Use os procedimentos a seguir para habilitar e desabilitar a distribuição avançada para seu trabalho de streaming. Você deve registrar um consumidor de fluxo para cada trabalho que consuma dados do fluxo.

**Para permitir um maior consumo de distribuição avançada no trabalho:**

1. Registre um consumidor de fluxo para o trabalho usando a API do Kinesis. Siga as instruções para *register a consumer with enhanced fan-out using the Kinesis Data Streams API* na [documentação do Kinesis](https://docs.aws.amazon.com//streams/latest/dev/building-enhanced-consumers-api). Você só precisará seguir a primeira etapa: chamar [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html). Sua solicitação deve retornar um ARN, *consumerARN*. 

1. Defina a opção de conexão `fanoutConsumerARN` como *consumerARN* nos argumentos do método de conexão.

1. Reinicie seu trabalho.

**Para desabilitar o consumo de distribuição avançada no trabalho:**

1. Remova a opção de conexão `fanoutConsumerARN` da sua chamada de método.

1. Reinicie seu trabalho.

1. Siga as instruções para *deregister a consumer* na [documentação do Kinesis](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-console.html). Essas instruções se aplicam ao console, mas também podem ser obtidas por meio da API Kinesis. Para obter mais informações sobre o cancelamento do registro de consumidores de fluxo por meio da API do Kinesis, consulte [DeregisterStreamConsumer](https://docs.aws.amazon.com//kinesis/latest/APIReference/API_DeregisterStreamConsumer.html) na documentação do Kinesis.