

# Conexión de Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-home"></a>

Puede utilizar una conexión de Kafka para leer y escribir en Amazon Kinesis Data Streams mediante información almacenada en una tabla del catálogo de datos o si proporciona información para acceder directamente al flujo de datos. Puede leer la información de Kinesis en un DataFrame de Spark y, a continuación, convertirla en un DynamicFrame de AWS Glue. Puede escribir DynamicFrames en Kinesis en formato JSON. Si accede directamente a la secuencia de datos, utilice estas opciones para proporcionar información sobre cómo acceder a la secuencia de datos.

Si utiliza `getCatalogSource` o `create_data_frame_from_catalog` para consumir registros de una fuente de streaming de Kinesis, el trabajo tiene la base de datos de Data Catalog y la información del nombre de la tabla, y puede utilizarla para obtener algunos parámetros básicos para la lectura de la fuente de streaming de Kinesis. Si utiliza `getSource`, `getSourceWithFormat`, `createDataFrameFromOptions` o `create_data_frame_from_options` debe especificar estos parámetros básicos mediante las opciones de conexión descritas aquí.

Puede especificar las opciones de conexión para Kinesis al utilizar los siguientes argumentos para los métodos especificados en la clase `GlueContext`.
+ Scala
  + `connectionOptions`: se debe utilizar con `getSource`, `createDataFrameFromOptions` y `getSink` 
  + `additionalOptions`: se debe utilizar con `getCatalogSource`, `getCatalogSink`
  + `options`: se debe utilizar con `getSourceWithFormat`, `getSinkWithFormat`
+ Python
  + `connection_options`: se debe utilizar con `create_data_frame_from_options`, `write_dynamic_frame_from_options`
  + `additional_options`: se debe utilizar con `create_data_frame_from_catalog`, `write_dynamic_frame_from_catalog`
  + `options`: se debe utilizar con `getSource`, `getSink`

Para obtener notas y restricciones sobre los trabajos de ETL de Streaming, consulte [Notas y restricciones de ETL de streaming](add-job-streaming.md#create-job-streaming-restrictions).

## Configurar Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-configure"></a>

Para conectarse desde un flujo de datos de Kinesis en un trabajo de AWS Glue Spark, necesitará algunos requisitos previos:
+ Si está leyendo, el trabajo de AWS Glue debe tener permisos de IAM de nivel de acceso de lectura para el flujo de datos de Kinesis.
+ Si está escribiendo, el trabajo de AWS Glue debe tener permisos de IAM de nivel de acceso de escritura para el flujo de datos de Kinesis.

En algunos casos, tendrá que configurar requisitos previos adicionales:
+ Si su trabajo de AWS Glue está configurado con **conexiones de red adicionales** (normalmente para conectarse a otros conjuntos de datos) y una de esas conexiones proporciona **opciones de red** de Amazon VPC, esto indicará que su trabajo se comunique a través de Amazon VPC. En este caso, también tendrá que configurar el flujo de datos de Kinesis para que se comunique a través de Amazon VPC. Puede hacerlo mediante la creación de un punto de conexión de VPC de tipo interfaz entre la Amazon VPC y el flujo de datos de Kinesis. Para obtener más información, consulte [Uso de Kinesis de Amazon Kinesis Data Streams con puntos de conexión de VPC de interfaz](https://docs.aws.amazon.com//streams/latest/dev/vpc.html).
+ Al especificar Amazon Kinesis Data Streams en otra cuenta, debe configurar los roles y las políticas para permitir el acceso entre cuentas. Para obtener más información, consulte [Ejemplo: leer desde un flujo de Kinesis en una cuenta diferente](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html).

Para obtener más información sobre los requisitos previos del trabajo de ETL de Streaming, consulte [Trabajos ETL de streaming en AWS Glue](add-job-streaming.md).

## Ejemplo: lectura de transmisiones desde Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-read"></a>

### Ejemplo: lectura de transmisiones desde Kinesis
<a name="section-etl-connect-kinesis-read"></a>

Se utiliza junto con [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Ejemplo de origen de streaming de Amazon Kinesis:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Ejemplo: escribir en flujos de Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-write"></a>

### Ejemplo: lectura de transmisiones desde Kinesis
<a name="section-etl-connect-kinesis-read"></a>

Se utiliza junto con [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Ejemplo de origen de streaming de Amazon Kinesis:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Referencia de opciones de conexión de Kinesis
<a name="aws-glue-programming-etl-connect-kinesis"></a>

Designa opciones de conexión para Amazon Kinesis Data Streams.

Utilice las siguientes opciones de conexión para los orígenes de datos de streaming de Kinesis: 
+ `"streamARN"`: (Obligatorio) Se utiliza para leer/escribir. El ARN de flujo de datos de Kinesis.
+ `"classification"`: (Obligatorio para lectura) Se utiliza para leer. El formato de archivo utilizado por los datos del registro. Obligatorio a menos que se proporcione a través del catálogo de datos.
+ `"streamName"`: (Opcional) Se usa para leer. Nombre de un flujo de datos de Kinesis para leer. Utilizado con `endpointUrl`.
+ `"endpointUrl"`: (Opcional) Se usa para leer. Predeterminado: “https://kinesis.us-east-1.amazonaws.com”. El punto de conexión de AWS del flujo de Kinesis. No es necesario cambiar esto a menos que se conecte a una región especial.
+ `"partitionKey"`: (Opcional) Se usa para escribir. La clave de partición de Kinesis que se utiliza al producir registros.
+ `"delimiter"`: (Opcional) Se usa para leer. El separador de valores que se utiliza cuando `classification` es CSV. El valor predeterminado es “`,`”.
+ `"startingPosition"`: (Opcional) Se usa para leer. La posición inicial en el flujo de datos de Kinesis para leer los datos. Los valores posibles son `"latest"`, `"trim_horizon"`, `"earliest"` o una cadena de marca de tiempo en formato UTC en el patrón `yyyy-mm-ddTHH:MM:SSZ` (donde `Z` representa un desplazamiento de zona horaria UTC con un \$1/-. Por ejemplo, “04-04-2023 T 08:00:00-04:00”). El valor predeterminado es `"latest"`. Nota: La cadena de marca de tiempo en formato UTC para `"startingPosition"` solo es compatible con la versión 4.0 o posterior de Glue AWS.
+ `"failOnDataLoss"`: (Opcional) No se realizará el trabajo si falta o ha caducado alguna partición activa. El valor predeterminado es `"false"`.
+ `"awsSTSRoleARN"`: (Opcional) Se usa para escribir/leer. El nombre de recurso de Amazon (ARN) del rol de que se asumirá mediante AWS Security Token Service (AWS STS). Este rol debe tener permisos para describir o leer operaciones de registros del flujo de datos de Kinesis. Debe utilizar este parámetro para acceder a un flujo de datos de otra cuenta. Se utiliza junto con `"awsSTSSessionName"`.
+ `"awsSTSSessionName"`: (Opcional) Se usa para escribir/leer. Un identificador para la sesión que asume el rol mediante AWS STS. Debe utilizar este parámetro para acceder a un flujo de datos de otra cuenta. Se utiliza junto con `"awsSTSRoleARN"`.
+ `"awsSTSEndpoint"`: (Opcional) El punto de conexión de AWS STS que se utilizará al conectarse a Kinesis con un rol asumido. Esto permite usar el punto de conexión regional de AWS STS en una VPC, lo que no es posible con el punto de conexión global predeterminado.
+ `"maxFetchTimeInMs"`: (Opcional) Se usa para leer. El tiempo máximo que le tomó al ejecutor del trabajo leer los registros del lote actual en el flujo de datos de Kinesis, especificado en milisegundos (ms). Pueden realizarse varias llamadas a la API de `GetRecords` durante este tiempo. El valor predeterminado es `1000`.
+ `"maxFetchRecordsPerShard"`: (Opcional) Se usa para leer. El número máximo de registros que se recuperará por partición en el flujo de datos de Kinesis por microlote. Nota: El cliente puede exceder este límite si el trabajo de streaming ya leyó registros adicionales de Kinesis (en la misma llamada de obtención de registros). Si `maxFetchRecordsPerShard` tiene que ser preciso, entonces tiene que ser un múltiplo de `maxRecordPerRead`. El valor predeterminado es `100000`.
+ `"maxRecordPerRead"`: (Opcional) Se usa para leer. El número máximo de registros que se recuperará del flujo de datos de Kinesis en cada operación `getRecords`. El valor predeterminado es `10000`.
+ `"addIdleTimeBetweenReads"`: (Opcional) Se usa para leer. Agrega un retardo de tiempo entre dos operaciones `getRecords` consecutivas. El valor predeterminado es `"False"`. Esta opción sólo se puede configurar para Glue versión 2.0 y superior. 
+ `"idleTimeBetweenReadsInMs"`: (Opcional) Se usa para leer. El tiempo mínimo de retraso entre dos operaciones `getRecords` consecutivas, especificado en ms. El valor predeterminado es `1000`. Esta opción sólo se puede configurar para Glue versión 2.0 y superior. 
+ `"describeShardInterval"`: (Opcional) Se usa para leer. El intervalo mínimo de tiempo entre dos llamadas a la API `ListShards` para que su script considere cambios en los fragmentos. Para obtener más información, consulte [Estrategias para cambios en los fragmentos](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*. El valor predeterminado es `1s`.
+ `"numRetries"`: (Opcional) Se usa para leer. El número máximo de reintentos para las solicitudes de la API de Kinesis Data Streams. El valor predeterminado es `3`.
+ `"retryIntervalMs"`: (Opcional) Se usa para leer. El periodo de enfriamiento (especificado en ms) antes de volver a intentar la llamada a la API de Kinesis Data Streams. El valor predeterminado es `1000`.
+ `"maxRetryIntervalMs"`: (Opcional) Se usa para leer. El periodo de enfriamiento máximo (especificado en ms) entre dos intentos de llamada a la API de Kinesis Data Streams. El valor predeterminado es `10000`.
+ `"avoidEmptyBatches"`: (Opcional) Se usa para leer. Evita crear un trabajo de microlotes vacío al comprobar si hay datos no leídos en el flujo de datos de Kinesis antes de que se inicie el lote. El valor predeterminado es `"False"`.
+ `"schema"`: (Obligatorio cuando inferSchema se establece en false) Se utiliza para leer. El esquema que se utilizará para procesar la carga útil. Si la clasificación es `avro`, el esquema proporcionado debe estar en el formato de esquema Avro. Si la clasificación no es `avro`, el esquema proporcionado debe estar en el formato de esquema DDL.

  A continuación, se muestran algunos ejemplos de esquemas.

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`: (Opcional) Se usa para leer. El valor predeterminado es “false”. Si se establece en “true”, el esquema se detectará durante el tiempo de ejecución desde la carga dentro de `foreachbatch`.
+ `"avroSchema"`: (Obsoleto) Se usa para leer. Parámetro utilizado para especificar un esquema de datos Avro cuando se utiliza el formato Avro. Este parámetro se ha quedado obsoleto. Utilice el parámetro `schema`.
+ `"addRecordTimestamp"`: (Opcional) Se usa para leer. Cuando esta opción se establece en “true”, la salida de datos contendrá una columna adicional denominada “\$1\$1src\$1timestamp” que indica la hora en la que el flujo recibió el registro correspondiente. El valor predeterminado es “false”. Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.
+ `"emitConsumerLagMetrics"`: (Opcional) Se usa para leer. Cuando la opción se establece en “verdadera”, para cada lote, emitirá las métricas correspondientes al periodo comprendido entre el registro más antiguo recibido por el flujo y el momento en que llegue a AWS Glue en CloudWatch. El nombre de la métrica es “glue.driver.streaming.maxConsumerLagInMs”. El valor predeterminado es “false”. Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.
+ `"fanoutConsumerARN"`: (Opcional) Se usa para leer. El ARN de un consumidor de un flujo de Kinesis para el flujo especificado en `streamARN`. Se utiliza para habilitar el modo de distribución mejorada para la conexión de Kinesis. Para obtener más información sobre cómo consumir una transmisión de Kinesis con una distribución mejorada, consulte [Uso de una distribución mejorada en los trabajos de streaming de Kinesis](aws-glue-programming-etl-connect-kinesis-efo.md).
+ `"recordMaxBufferedTime"`: (Opcional) Se usa para escribir. Predeterminado: 1000 (ms). Tiempo máximo que un registro permanece almacenado en búfer mientras espera a ser escrito.
+ `"aggregationEnabled"`: (Opcional) Se usa para escribir. Valor predeterminado: verdadero. Especifica si los registros deben agregarse antes de enviarlos a Kinesis.
+ `"aggregationMaxSize"`: (Opcional) Se usa para escribir. Predeterminado: 51 200 (bytes). Si un registro supera este límite, omitirá el agregador. Nota: Kinesis impone un límite de 50 KB en el tamaño del registro. Si lo establece por encima de 50 KB, Kinesis rechazará los registros de gran tamaño.
+ `"aggregationMaxCount"`: (Opcional) Se usa para escribir. Predeterminado: 4294967295. Número máximo de elementos a empaquetar en un registro agregado.
+ `"producerRateLimit"`: (Opcional) Se usa para escribir. Predeterminado: 150 (%). Limita el rendimiento por partición enviado desde un solo productor (por ejemplo, su trabajo), como porcentaje del límite de backend.
+ `"collectionMaxCount"`: (Opcional) Se usa para escribir. Predeterminado: 500. Número máximo de elementos a incluir en una solicitud de PutRecords. 
+ `"collectionMaxSize"`: (Opcional) Se usa para escribir. Predeterminado: 5 242 880 (bytes). Cantidad máxima de datos para enviar con una solicitud de PutRecords.

# Uso de una distribución mejorada en los trabajos de streaming de Kinesis
<a name="aws-glue-programming-etl-connect-kinesis-efo"></a>

Un consumidor con una distribución mejorada puede recibir los registros de una transmisión de Kinesis con un rendimiento dedicado que puede ser superior al de los consumidores habituales. Esto se logra mediante la optimización del protocolo de transferencia utilizado para proporcionar datos a un consumidor de Kinesis, como su trabajo. Para obtener más información sobre Kinesis Enhanced Fan-Out, consulte la [documentación de Kinesis](https://docs.aws.amazon.com//streams/latest/dev/enhanced-consumers.html).

En el modo de distribución mejorada, las opciones de conexión `maxRecordPerRead` y `idleTimeBetweenReadsInMs` ya no se aplican, ya que esos parámetros no se pueden configurar cuando se utiliza la apertura de distribución mejorada. Las opciones de configuración para los reintentos funcionan como se describe.

Utilice los siguientes procedimientos para habilitar y deshabilitar la distribución mejorada para su trabajo de streaming. Debe registrar un consumidor de transmisión para cada trabajo que vaya a consumir datos de su transmisión.

**Para habilitar un consumo de distribución mejorada en su trabajo:**

1. Registre un consumidor de transmisión para su trabajo mediante la API de Kinesis. Siga las instrucciones para *registrar a un consumidor con una distribución mejorada mediante la API de Kinesis Data Streams* de la [documentación de Kinesis](https://docs.aws.amazon.com//streams/latest/dev/building-enhanced-consumers-api). Solo tendrá que seguir el primer paso: llamar a [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html). Su solicitud debe devolver un ARN, *consumerARN.* 

1. Establezca la opción de conexión `fanoutConsumerARN` en *consumerARN* en los argumentos del método de conexión.

1. Reinicie el trabajo.

**Para deshabilitar un consumo de distribución mejorada en su trabajo:**

1. Elimine la opción de conexión `fanoutConsumerARN` de su método de llamada.

1. Reinicie el trabajo.

1. Siga las instrucciones para *anular el registro de un consumidor* en la [documentación de Kinesis](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-console.html). Estas instrucciones se aplican a la consola, pero también se pueden obtener a través de la API de Kinesis. Para obtener más información sobre la cancelación del registro de consumidores de transmisión a través de la API de Kinesis, consulte [DeregisterStreamConsumer](https://docs.aws.amazon.com//kinesis/latest/APIReference/API_DeregisterStreamConsumer.html) en la documentación de Kinesis.