

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Connexions Kafka
<a name="aws-glue-programming-etl-connect-kafka-home"></a>

Vous pouvez utiliser une connexion Kafka pour lire et écrire dans des flux de données Kafka à l’aide d’informations stockées dans une table de catalogue de données ou en fournissant des informations permettant d’accéder directement au flux de données. La connexion prend en charge un cluster Kafka ou un cluster Amazon Managed Streaming for Apache Kafka. Vous pouvez lire les informations de Kafka dans un Spark DataFrame, puis les convertir en AWS Glue DynamicFrame. Vous pouvez DynamicFrames écrire dans Kafka au format JSON. Si vous accédez directement au flux de données, utilisez ces options pour fournir des informations sur la façon d'accéder au flux de données.

Si vous utilisez `getCatalogSource` ou `create_data_frame_from_catalog` pour consommer des enregistrements à partir d’une source de streaming Kafka, ou `getCatalogSink` ou `write_dynamic_frame_from_catalog` pour écrire des enregistrements dans Kafka, et que la tâche dispose des informations relatives à la base de données Data Catalog et au nom de la table, et peut les utiliser pour obtenir certains paramètres de base pour la lecture à partir de la source de streaming Kafka. Si vous utilisez `getSource`, `getCatalogSink`, `getSourceWithFormat`, `getSinkWithFormat`, `createDataFrameFromOptions`, `create_data_frame_from_options` ou `write_dynamic_frame_from_catalog`, vous devez spécifier ces paramètres de base à l’aide des options de connexion décrites ici.

Vous pouvez spécifier les options de connexion pour Kafka à l’aide des arguments suivants pour les méthodes spécifiées dans la classe `GlueContext`.
+ Scala
  + `connectionOptions` : utiliser avec `getSource`, `createDataFrameFromOptions`, `getSink` 
  + `additionalOptions` : utiliser avec `getCatalogSource`, `getCatalogSink`
  + `options` : utiliser avec `getSourceWithFormat`, `getSinkWithFormat`
+ Python
  + `connection_options` : utiliser avec `create_data_frame_from_options`, `write_dynamic_frame_from_options`
  + `additional_options` : utiliser avec `create_data_frame_from_catalog`, `write_dynamic_frame_from_catalog`
  + `options` : utiliser avec `getSource`, `getSink`

Pour les remarques et les restrictions concernant les tâches ETL de streaming, consultez [Restrictions et notes sur ETL en streaming](add-job-streaming.md#create-job-streaming-restrictions).

**Topics**
+ [Configurer Kafka](#aws-glue-programming-etl-connect-kafka-configure)
+ [Exemple : lecture à partir de flux Kafka](#aws-glue-programming-etl-connect-kafka-read)
+ [Exemple : écriture dans les flux Kafka](#aws-glue-programming-etl-connect-kafka-write)
+ [Référence des options de connexion de Kafka](#aws-glue-programming-etl-connect-kafka)

## Configurer Kafka
<a name="aws-glue-programming-etl-connect-kafka-configure"></a>

Il n'y a aucune AWS condition préalable pour se connecter aux flux Kafka disponibles sur Internet.

Vous pouvez créer une connexion AWS Glue Kafka pour gérer vos identifiants de connexion. Pour de plus amples informations, veuillez consulter [Création d’une connexion AWS Glue pour un flux de données Apache Kafka](add-job-streaming.md#create-conn-streaming). Dans la configuration de votre tâche AWS Glue, fournissez *connectionName* une **connexion réseau supplémentaire**, puis, dans votre appel de méthode, fournissez *connectionName* le `connectionName` paramètre.

Dans certains cas, vous devrez configurer des prérequis supplémentaires :
+ Si vous utilisez Amazon Managed Streaming pour Apache Kafka avec l'authentification IAM, vous aurez besoin d'une configuration IAM appropriée.
+ Si vous utilisez Amazon Managed Streaming pour Apache Kafka avec un Amazon VPC, vous aurez besoin d'une configuration Amazon VPC appropriée. Vous devez créer une connexion AWS Glue fournissant les informations de connexion Amazon VPC. Vous aurez besoin de la configuration de votre tâche pour inclure la connexion AWS Glue en tant que **connexion réseau supplémentaire**.

Pour plus d’informations sur les prérequis de la tâche ETL de streaming, consultez [Tâches ETL en streaming dans AWS Glue](add-job-streaming.md).

## Exemple : lecture à partir de flux Kafka
<a name="aws-glue-programming-etl-connect-kafka-read"></a>

Utilisez conjointement avec [forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch).

Exemple pour la source de streaming Kafka :

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "startingOffsets": "earliest", 
      "inferSchema": "true", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

## Exemple : écriture dans les flux Kafka
<a name="aws-glue-programming-etl-connect-kafka-write"></a>

Exemples pour écrire dans Kafka :

Exemple avec la méthode `getSink` :

```
data_frame_datasource0 = 
glueContext.getSink(
	connectionType="kafka",
	connectionOptions={
		JsonOptions("""{
			"connectionName": "ConfluentKafka", 
			"classification": "json", 
			"topic": "kafka-auth-topic", 
			"typeOfData": "kafka"}
		""")}, 
	transformationContext="dataframe_ApacheKafka_node1711729173428")
	.getDataFrame()
```

Exemple avec la méthode `write_dynamic_frame.from_options` :

```
kafka_options =
    { "connectionName": "ConfluentKafka", 
      "topicName": "kafka-auth-topic", 
      "classification": "json" 
    }
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
```

## Référence des options de connexion de Kafka
<a name="aws-glue-programming-etl-connect-kafka"></a>

Lors de la lecture, utilisez les options de connexion suivantes avec `"connectionType": "kafka"` :
+ `"bootstrap.servers"`(Obligatoire) Une liste de serveurs bootstrap URLs, par exemple, comme`b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094`. Cette option doit être spécifiée dans l'appel d'API ou définie dans les métadonnées de la table dans le catalogue de données.
+ `"security.protocol"`(Obligatoire) Le protocole utilisé pour communiquer avec les agents. Les valeurs possibles sont `"SSL"` ou `"PLAINTEXT"`.
+ `"topicName"` : (requis) liste de rubriques séparées par des virgules auxquelles s'abonner. Vous devez spécifier un seul et unique élément parmi `"topicName"`, `"assign"` ou `"subscribePattern"`.
+ `"assign"` : (requis) chaîne JSON indiquant le `TopicPartitions` spécifique à utiliser. Vous devez spécifier un seul et unique élément parmi `"topicName"`, `"assign"` ou `"subscribePattern"`.

  Exemple : '\$1"topicA":[0,1],"topicB":[2,4]\$1'
+ `"subscribePattern"` : (obligatoire) chaîne d'expression rationnelle Java qui identifie la liste de rubriques à laquelle vous souhaitez vous abonner. Vous devez spécifier un seul et unique élément parmi `"topicName"`, `"assign"` ou `"subscribePattern"`.

  Exemple : 'topic.\$1'
+ `"classification"` : (obligatoire) le format de fichier utilisé par les données de l'enregistrement. Obligatoire, sauf s'il est fourni par le catalogue de données.
+ `"delimiter"` : (facultatif) le séparateur de valeurs utilisé lorsque `classification` est CSV. La valeur par défaut est « `,` ».
+ `"startingOffsets"` : (facultatif) position de départ dans la rubrique Kafka à partir de laquelle lire les données. Les valeurs possibles sont `"earliest"` ou `"latest"`. La valeur par défaut est `"latest"`.
+ `"startingTimestamp"`: (Facultatif, disponible uniquement pour AWS la version 4.0 ou ultérieure de Glue) Horodatage de l'enregistrement dans la rubrique Kafka à partir duquel les données doivent être lues. La valeur possible est une chaîne d'horodatage au format UTC dans le modèle `yyyy-mm-ddTHH:MM:SSZ` (où `Z` représente un décalage de fuseau horaire UTC avec un \$1/-. Par exemple : « 2023-04-04T08:00:00-04:00 »).

  Remarque : seule l'une des propriétés « StartingOffsets » ou « StartingTimestamp » peut figurer dans la liste des options de connexion du script de streaming AWS Glue. L'inclusion de ces deux propriétés entraînera l'échec de la tâche.
+ `"endingOffsets"` : (facultatif) point de fin lorsqu'une requête par lots est terminée. Les valeurs possibles sont `"latest"` ou une chaîne JSON qui spécifie un décalage de fin pour chaque `TopicPartition`.

  Pour la chaîne JSON, le format est `{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}`. La valeur `-1` en tant que décalage représente `"latest"`.
+ `"pollTimeoutMs"` : (facultatif) délai d'attente en millisecondes pour interroger les données de Kafka dans les exécuteurs de tâches Spark. La valeur par défaut est `600000`.
+ `"numRetries"` : (facultatif) nombre de nouvelles tentatives avant de ne pas récupérer les décalages Kafka. La valeur par défaut est `3`.
+ `"retryIntervalMs"` : (facultatif) temps d'attente en millisecondes avant d'essayer de récupérer les décalages Kafka. La valeur par défaut est `10`.
+ `"maxOffsetsPerTrigger"` : (facultatif) limite de taux sur le nombre maximal de décalages qui sont traités par intervalle de déclenchement. Le nombre total spécifié de décalages est réparti proportionnellement entre les `topicPartitions` des différents volumes. La valeur par défaut est null, ce qui signifie que le consommateur lit tous les décalages jusqu'au dernier décalage connu.
+ `"minPartitions"` : (facultatif) nombre minimum de partitions à lire à partir de Kafka. La valeur par défaut est nulle, ce qui signifie que le nombre de partitions Spark est égal au nombre de partitions Kafka.
+  `"includeHeaders"`: (facultatif) indique s'il faut inclure les en-têtes Kafka. Lorsque l'option est définie sur « true » (vrai), la sortie de données contiendra une colonne supplémentaire nommée « glue\$1streaming\$1kafka\$1headers » avec le type `Array[Struct(key: String, value: String)]`. La valeur définie par défaut est « false ». Cette option est disponible dans AWS Glue version 3.0 ou ultérieure. 
+ `"schema"` : (requis lorsque inferSchema est défini sur false) schéma à utiliser pour traiter la charge utile. Si la classification est `avro`, le schéma fourni doit être au format de schéma Avro. Si la classification n'est pas `avro`, le schéma fourni doit être au format de schéma DDL.

  Voici quelques exemples de schémas.

------
#### [ Example in DDL schema format ]

  ```
  'column1' INT, 'column2' STRING , 'column3' FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
  "type":"array",
  "items":
  {
  "type":"record",
  "name":"test",
  "fields":
  [
    {
      "name":"_id",
      "type":"string"
    },
    {
      "name":"index",
      "type":
      [
        "int",
        "string",
        "float"
      ]
    }
  ]
  }
  }
  ```

------
+ `"inferSchema"` : (facultatif) la valeur par défaut est « false ». S'il est défini sur « true », le schéma sera détecté lors de l'exécution à partir de la charge utile dans `foreachbatch`.
+ `"avroSchema"` : (obsolète) paramètre utilisé pour spécifier un schéma de données Avro lorsque le format Avro est utilisé. Ce paramètre est désormais obsolète. Utilisez le paramètre `schema`.
+ `"addRecordTimestamp"` : (facultatif) lorsque cette option est définie sur « true », la sortie de données contient une colonne supplémentaire nommée « \$1\$1src\$1timestamp » qui indique l'heure à laquelle l'enregistrement correspondant est reçu par la rubrique. La valeur par défaut est « false ». Cette option est prise en charge dans AWS Glue version 4.0 ou ultérieure.
+ `"emitConsumerLagMetrics"`: (Facultatif) Lorsque l'option est définie sur « vrai », pour chaque lot, elle émet les métriques correspondant à la durée comprise entre le plus ancien enregistrement reçu par le sujet et le moment où il arrive CloudWatch. AWS Glue Le nom de la métrique est « glue.driver.streaming ». maxConsumerLagInMs». La valeur par défaut est « false ». Cette option est prise en charge dans AWS Glue version 4.0 ou ultérieure.

Lors de l’écriture, utilisez les options de connexion suivantes avec `"connectionType": "kafka"` :
+ `"connectionName"`(Obligatoire) Nom de la connexion AWS Glue utilisée pour se connecter au cluster Kafka (similaire à la source Kafka).
+ `"topic"` (Obligatoire) Si une colonne de rubrique existe, sa valeur est utilisée comme rubrique lors de l’écriture de la ligne donnée dans Kafka, sauf si l’option de configuration de la rubrique est définie. C’est-à-dire que l’option de configuration `topic` remplace la colonne de rubrique.
+ `"partition"` (Facultatif) Si un numéro de partition valide est spécifié, cette `partition` sera utilisée lors de l’envoi de l’enregistrement.

  Si aucune partition n’est spécifiée, mais qu’une `key` est présente, une partition sera choisie en utilisant le hachage de la clé.

  Si ni une `key`, ni une `partition` ne sont présentes, une partition sera choisie en partitionnant de manière permanente ces modifications lorsqu’au moins des octets batch.size seront produits sur la partition.
+ `"key"` (Facultatif) Utilisé pour le partitionnement si `partition` est nul.
+ `"classification"` (Facultatif) Format de fichier utilisé par les données de l’enregistrement. Nous prenons uniquement en charge les formats JSON, CSV et Avro.

  Avec le format Avro, nous pouvons fournir une valeur avroSchema personnalisée avec laquelle il est possible de sérialiser, mais notez que cela doit également être fourni sur la source pour la désérialisation. Sinon, par défaut, il utilise Apache AvroSchema pour la sérialisation.

En outre, vous pouvez optimiser le récepteur Kafka selon les besoins en mettant à jour les [paramètres de configuration du producteur Kafka](https://kafka.apache.org/documentation/#producerconfigs). Notez qu’il n’existe aucune liste d’autorisation pour les options de connexion, toutes les paires clé-valeur sont conservées telles quelles sur le récepteur.

Cependant, il existe une petite liste de refus d’options qui ne prendra pas effet. Pour plus d’informations, consultez [Kafka specific configurations](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).