

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics pour les applications SQL :

1. À compter du **1er septembre 2025,** nous ne fournirons aucune correction de bogue pour les applications Amazon Kinesis Data Analytics for SQL, car leur support sera limité, compte tenu de l'arrêt prochain.

2. À compter du **15 octobre 2025,** vous ne pourrez plus créer de nouvelles applications Kinesis Data Analytics for SQL.

3. Nous supprimerons vos candidatures à compter **du 27 janvier 2026**. Vous ne serez pas en mesure de démarrer ou d'utiliser vos applications Amazon Kinesis Data Analytics for SQL. Support ne sera plus disponible pour Amazon Kinesis Data Analytics for SQL à partir de cette date. Pour de plus amples informations, veuillez consulter [Arrêt d'Amazon Kinesis Data Analytics pour les applications SQL](discontinuation.md).

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Configuration de l'entrée de l'application
<a name="how-it-works-input"></a>

Votre application Amazon Kinesis Data Analytics peut recevoir une entrée d’une seule source de streaming et, le cas échéant, utiliser une source de données de référence. Pour de plus amples informations, veuillez consulter [Applications Amazon Kinesis Data Analytics pour SQL : fonctionnement](how-it-works.md). Les sections de cette rubrique décrivent les sources d'entrée d'une application.

**Topics**
+ [Configuration d'une source de diffusion](#source-streaming)
+ [Configuration d'une source de référence](#source-reference)
+ [Travailler avec JSONPath](about-json-path.md)
+ [Mappage d'éléments d'une source de streaming à des colonnes d'entrée SQL](sch-mapping.md)
+ [Utilisation de la fonction de découverte de schéma sur des données de diffusion](sch-dis.md)
+ [Utilisation de la fonction de découverte de schéma sur des données statiques](sch-dis-ref.md)
+ [Prétraitement des données à l’aide d’une fonction Lambda](lambda-preprocessing.md)
+ [Mise en parallèle des flux d'entrée pour un débit accru](input-parallelism.md)

## Configuration d'une source de diffusion
<a name="source-streaming"></a>

Lorsque vous créez une application, vous spécifiez une source de diffusion. Vous pouvez également modifier une entrée après avoir créé l’application. Amazon Kinesis Data Analytics prend en charge les sources de streaming suivantes pour votre application :
+ Un flux de données Kinesis 
+ Un flux de livraison Firehose

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Les clients existants qui utilisent les applications Kinesis Data Analytics pour SQL avec `KinesisFirehoseInput` peuvent continuer à ajouter des applications avec `KinesisFirehoseInput` au sein d’un compte existant à l’aide de Kinesis Data Analytics. Si vous êtes déjà client et que vous souhaitez créer un compte avec les applications Kinesis Data Analytics pour SQL avec `KinesisFirehoseInput`, vous pouvez créer un dossier via le formulaire d’augmentation de limite de service. Pour plus d’informations, consultez le [Centre AWS Support](https://console.aws.amazon.com/support/home#/). Nous vous recommandons de toujours tester les nouvelles applications avant de les mettre en production.

**Note**  
Si le flux de données Kinesis est chiffré, Kinesis Data Analytics accède aux données du flux chiffré de façon transparente sans qu’aucune configuration supplémentaire ne soit nécessaire. Kinesis Data Analytics ne stocke pas les données non chiffrées lues depuis Kinesis Data Streams. Pour plus d'informations, consultez la page [Qu'est-ce que le chiffrement côté serveur pour Kinesis Streams Data ?](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html).

Kinesis Data Analytics interroge en continu la source de streaming pour les nouvelles données et intègre celles-ci dans des flux intégrés à l’application en fonction de la configuration d’entrée. 

**Note**  
L’ajout d’un flux Kinesis en tant qu’entrée de votre application n’affecte pas les données du flux. Si une autre ressource, telle qu'un flux de diffusion Firehose, accédait également au même flux Kinesis, le flux de diffusion Firehose et l'application Kinesis Data Analytics recevraient les mêmes données. Toutefois, le débit et la limitation peuvent être affectés.

Votre code d'application peut interroger le flux intégré à l'application. Dans le cadre de la configuration d'entrée, vous fournissez les éléments suivants :
+ **Source de streaming** : Vous fournissez l’Amazon Resource Name (ARN) du flux et un rôle IAM que Kinesis Data Analytics peut assumer pour accéder au flux en votre nom. 
+ **Préfixe du nom de flux intégré à l’application** : Lorsque vous démarrez l’application, Kinesis Data Analytics crée le flux intégré à l’application spécifié. Dans votre code d'application, vous accédez au flux intégré à l'application à l'aide de ce nom. 

  Vous pouvez mapper le cas échéant une source de diffusion à plusieurs flux intégrés à l'application. Pour de plus amples informations, veuillez consulter [Limites](limits.md). Dans ce cas, Amazon Kinesis Data Analytics crée le nombre spécifié de flux intégrés à l'application avec les noms suivants *prefix* `_001` : *prefix*`_002`, et. *prefix* `_003` Par défaut, Kinesis Data Analytics mappe la source de streaming sur un flux intégré à l'application nommé. *prefix* `_001`

  Une limite est appliqué au rythme auquel vous pouvez insérer des lignes dans un flux intégré à l'application. Par conséquent, Kinesis Data Analytics prend en charge plusieurs flux intégrés à l’application pour vous permettre de transmettre des enregistrements dans votre application à un rythme beaucoup plus rapide. Si vous constatez que votre application n'arrive pas à suivre le rythme des données de la source de diffusion, vous pouvez ajouter des unités de parallélisme pour améliorer les performances. 
+ **Schéma de mappage** : Vous décrivez le format d’enregistrement (JSON, CSV) dans la source de streaming. Vous décrivez également la façon dont chaque enregistrement du flux est mappé à des colonnes dans le flux intégré à l'application qui est créé. C'est là où vous fournissez des noms de colonnes et des types de données. 

**Note**  
Kinesis Data Analytics place les identifiants (nom du flux et noms de colonnes) entre guillemets lors de la création du flux intégré à l’application. Lors de l'interrogation de ce flux et des colonnes, vous devez les spécifier entre guillemets en utilisant exactement la même casse (avec les mêmes lettres minuscules et majuscules). Pour plus d’informations sur les identifiants, consultez la section [Identifiants](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink SQL*.

Vous pouvez créer une application et configurer des entrées dans la console Amazon Kinesis Data Analytics. La console effectue ensuite les appels d'API nécessaires. Vous pouvez configurer l'entrée d'application lorsque vous créez une nouvelle API d'application ou ajouter la configuration d'entrée à une application existante. Pour plus d’informations, consultez [CreateApplication](API_CreateApplication.md) et [AddApplicationInput](API_AddApplicationInput.md). Voici la partie de configuration d'entrée du corps de requête d'API `Createapplication` :

```
 "Inputs": [
        {
            "InputSchema": {
                "RecordColumns": [
                    {
                        "Mapping": "string",
                        "Name": "string",
                        "SqlType": "string"
                    }
                ],
                "RecordEncoding": "string",
                "RecordFormat": {
                    "MappingParameters": {
                        "CSVMappingParameters": {
                            "RecordColumnDelimiter": "string",
                            "RecordRowDelimiter": "string"
                        },
                        "JSONMappingParameters": {
                            "RecordRowPath": "string"
                        }
                    },
                    "RecordFormatType": "string"
                }
            },
            "KinesisFirehoseInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "KinesisStreamsInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "Name": "string"
        }
    ]
```

## Configuration d'une source de référence
<a name="source-reference"></a>

Vous pouvez également ajouter le cas échéant une source de données de référence à une application existante pour enrichir les données en provenance de sources de diffusion. Vous devez stocker les données de référence en tant qu’objet dans votre compartiment Amazon S3. Lorsque l’application démarre, Amazon Kinesis Data Analytics lit l’objet Amazon S3 et crée une table de référence intégrée à l’application. Votre code d'application peut ensuite la joindre à un flux intégré à l'application. 

Vous stockez les données de référence dans l’objet Amazon S3 à l’aide des formats pris en charge (CSV, JSON). Par exemple, supposons que votre application effectue des analyses sur des ordres de bourse. Supposons que le format d'enregistrement suivant soit utilisé sur la source de diffusion :

```
Ticker, SalePrice, OrderId

AMZN     $700        1003
XYZ      $250        1004
...
```

Dans ce cas, vous pouvez envisager de gérer une source de données de référence afin de fournir des détails pour chaque symbole boursier, comme un nom de société.

```
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
```

Vous pouvez ajouter une source de données de référence d’application à l’aide de l’API ou de la console. Amazon Kinesis Data Analytics fournit les actions d’API suivantes pour gérer les sources de données de référence :
+  [AddApplicationReferenceDataSource](API_AddApplicationReferenceDataSource.md)
+ [UpdateApplication](API_UpdateApplication.md)

Pour plus d'informations sur l'ajout de données de référence; consultez [Exemple : ajout de données de référence à une application Kinesis Data Analytics](app-add-reference-data.md).

Notez ce qui suit :
+ Si l’application est en cours d’exécution, Kinesis Data Analytics crée un tableau de référence intégré à l’application, puis charge les données de référence immédiatement.
+ Si l’application n’est pas en cours d’exécution (par exemple, elle est à l’état prêt), Kinesis Data Analytics enregistre uniquement la configuration d’entrée mise à jour. Lorsque l’application commence à s’exécuter, Kinesis Data Analytics charge les données de référence dans votre application en tant que tableau.

Supposons que vous souhaitez actualiser les données après que Kinesis Data Analytics a créé le tableau de référence intégré à l’application. Vous avez peut-être mis à jour l’objet Amazon S3, ou vous souhaitez utiliser un autre objet Amazon S3. Dans ce cas, vous pouvez soit appeler explicitement [UpdateApplication](API_UpdateApplication.md), soit choisir **Actions**, **Synchroniser le tableau de données de référence** dans la console. Kinesis Data Analytics n’actualise pas automatiquement le tableau de référence intégré à l’application. 

Une limite est appliquée à la taille de l’objet Amazon S3 que vous pouvez créer comme source de données de référence. Pour de plus amples informations, veuillez consulter [Limites](limits.md). Si la taille de l’objet dépasse la limite, Kinesis Data Analytics ne peut pas charger les données. L'état de l'application s'affiche comme étant en cours d'exécution, mais les données ne sont pas lues.

Lorsque vous ajoutez une source de données de référence, vous fournissez les informations suivantes : 
+ **Nom du compartiment S3 et de la clé d’objet** : En plus du nom du compartiment et de la clé d’objet, vous fournissez également un rôle IAM que Kinesis Data Analytics peut assumer pour lire l’objet en votre nom. 
+ **Nom du tableau de référence intégré à l’application** : Kinesis Data Analytics crée ce tableau intégré à l’application et le remplit en lisant l’objet Amazon S3. Il s'agit du nom de table que vous spécifiez dans votre code d'application.
+ **Schéma de mappage** : vous décrivez le format d’enregistrement (JSON, CSV), l’encodage des données stockées dans l’objet Amazon S3. Vous décrivez également la façon dont chaque élément de données est mappé aux colonnes de la table de référence intégrée à l'application. 

Voici le corps de la demande dans la demande d'API `AddApplicationReferenceDataSource`.

```
{
    "applicationName": "string",
    "CurrentapplicationVersionId": number,
    "ReferenceDataSource": {
        "ReferenceSchema": {
            "RecordColumns": [
                {
                    "IsDropped": boolean,
                    "Mapping": "string",
                    "Name": "string",
                    "SqlType": "string"
                }
            ],
            "RecordEncoding": "string",
            "RecordFormat": {
                "MappingParameters": {
                    "CSVMappingParameters": {
                        "RecordColumnDelimiter": "string",
                        "RecordRowDelimiter": "string"
                    },
                    "JSONMappingParameters": {
                        "RecordRowPath": "string"
                    }
                },
                "RecordFormatType": "string"
            }
        },
        "S3ReferenceDataSource": {
            "BucketARN": "string",
            "FileKey": "string",
            "ReferenceRoleARN": "string"
        },
        "TableName": "string"
    }
}
```

# Travailler avec JSONPath
<a name="about-json-path"></a>

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez [Limites ](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

JSONPath est une méthode standardisée pour interroger les éléments d'un objet JSON. JSONPath utilise des expressions de chemin pour parcourir les éléments, les éléments imbriqués et les tableaux dans un document JSON. Pour de plus amples informations sur JSON, veuillez consulter [Introducing JSON](http://www.json.org/).

Amazon Kinesis Data Analytics JSONPath utilise des expressions dans le schéma source de l'application pour identifier les éléments de données d'une source de streaming contenant des données au format JSON.

Pour plus d'informations sur la façon de mapper les données en streaming sur le flux d'entrée de l'application, consultez [Mappage d'éléments d'une source de streaming à des colonnes d'entrée SQL](sch-mapping.md).

## Accès aux éléments JSON avec JSONPath
<a name="about-json-path-elements"></a>

Vous trouverez ci-dessous comment utiliser des JSONPath expressions pour accéder à différents éléments de données au format JSON. Pour les exemples de cette section, supposons que le flux source contient l’enregistrement JSON suivant :

```
{
  "customerName":"John Doe",
  "address":
  {
    "streetAddress":
    [
      "number":"123",
      "street":"AnyStreet"
    ],
    "city":"Anytown"
  }
  "orders":
  [
    { "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" },
    { "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" },
    { "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" }
  ]
}
```

### Accès à des éléments JSON
<a name="about-json-path-firstlevel"></a>

Pour interroger un élément dans des données JSON à l'aide de JSONPath, utilisez la syntaxe suivante. Ici, `$` représente la racine de la hiérarchie des données et `elementName` est le nom du nœud de l'élément à interroger.

```
$.elementName
```

L'expression suivante interroge l'élément `customerName` de l'exemple JSON précédent.

```
$.customerName
```

L'expression précédente retourne les informations suivantes à partir de l'enregistrement JSON précédent.

```
John Doe
```

**Note**  
Les expressions de chemin sont sensibles à la casse. L'expression `$.customername` retourne `null` à partir de l'exemple JSON précédent.

**Note**  
Si aucun élément n'apparaît à l'emplacement spécifié dans l'expression de chemin, l'expression retourne `null`. L'expression suivante retourne `null` à partir de l'exemple JSON précédent, car il n'y a aucun élément correspondant.  

```
$.customerId
```

### Accès aux éléments JSON imbriqués
<a name="about-json-path-nested"></a>

Pour interroger un élément JSON imbriqué, utilisez la syntaxe suivante.

```
$.parentElement.element
```

L'expression suivante interroge l'élément `city` de l'exemple JSON précédent.

```
$.address.city
```

L'expression précédente retourne les informations suivantes à partir de l'enregistrement JSON précédent.

```
Anytown
```

Vous pouvez interroger d'autres niveaux de sous-éléments à l'aide de la syntaxe suivante.

```
$.parentElement.element.subElement
```

L'expression suivante interroge l'élément `street` de l'exemple JSON précédent.

```
$.address.streetAddress.street
```

L'expression précédente retourne les informations suivantes à partir de l'enregistrement JSON précédent.

```
AnyStreet
```

### Accès aux tableaux
<a name="about-json-path-arrays"></a>

Vous pouvez accéder aux données dans un tableau JSON de la manière suivante :
+ Récupérer tous les éléments du tableau sous la forme d’une ligne unique.
+ Récupérer chaque élément du tableau sous la forme d’une ligne distincte.

#### Récupérer tous les éléments du tableau dans une ligne unique.
<a name="about-json-path-arrays-row"></a>

Pour interroger tout le contenu d'un tableau sur une seule ligne, utilisez la syntaxe suivante.

```
$.arrayObject[0:]
```

L'expression suivante interroge tout le contenu de l'élément `orders` de l'exemple JSON précédent utilisé dans cette section. Elle retourne le contenu du tableau dans une seule colonne et sur une seule ligne.

```
$.orders[0:]
```

L'expression précédente retourne les éléments suivants de l'exemple JSON utilisé dans cette section.

```
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"},{"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"},{"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]
```

#### Récupérez tous les éléments dans un tableau dans des lignes séparées
<a name="about-json-path-arrays-separate"></a>

Pour interroger chaque élément d'un tableau sur une ligne distincte, utilisez la syntaxe suivante.

```
$.arrayObject[0:].element
```

L'expression suivante interroge les éléments `orderId` de l'exemple JSON précédent et retourne chaque élément du tableau sur une ligne distincte.

```
$.orders[0:].orderId
```

L'expression précédente retourne les informations suivantes à partir de l'enregistrement JSON précédent, chaque élément de données étant retourné sur une ligne distincte.


****  

|  | 
| --- |
|  23284  | 
|  63122  | 
|  77284  | 

**Note**  
Si un schéma interrogeant des éléments de tableau comporte des expressions interrogeant des éléments autres, ces derniers sont répétés pour chaque élément du tableau. Par exemple, supposons qu'un schéma de l'exemple JSON précédent inclut les expressions suivantes :  
\$1.customerName
\$1.orders[0:].orderId
Dans ce cas, les lignes de données retournées de l'exemple d'élément de flux d'entrée ressemble à ce qui suit, l'élément `name` étant répété pour chaque élément `orderId`.  


****  

|  |  | 
| --- |--- |
|  John Doe  |  23284  | 
|  Jean Dupont  |  63122  | 
|  Jean Dupont  |  77284  | 

**Note**  
Les limitations suivantes s’appliquent aux expressions de tableau dans Amazon Kinesis Data Analytics :  
Seul un niveau de déréférencement est pris en charge dans une expression de tableau. Le format d'expression suivant n'est pas pris en charge.  

  ```
  $.arrayObject[0:].element[0:].subElement
  ```
Seul un tableau peut être mis à plat dans un schéma. Plusieurs tableaux peuvent être référencés/retournés sur une ligne contenant tous les éléments du tableau. Cependant, seul un tableau peut avoir chacun de ses éléments retournés sur des lignes distinctes.  
Un schéma contenant des éléments au format suivant est valide. Ce format retourne le contenu du second tableau dans une seule colonne, répétée pour chaque élément du premier tableau.  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:]
  ```
Un schéma contenant des éléments au format suivant n'est pas valide.  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:].element
  ```

## Autres considérations
<a name="about-json-path-other"></a>

Les considérations supplémentaires à prendre en compte pour travailler avec JSONPath sont les suivantes :
+ Si aucun élément individuel n'accède à aucun tableau dans les JSONPath expressions du schéma de l'application, une seule ligne est créée dans le flux d'entrée de l'application pour chaque enregistrement JSON traité. 
+ Lorsqu'un tableau est mis à plat (c’est-à-dire que ses éléments sont retournés sous la forme de lignes individuelles), tous les éléments manquants retournent une valeur nulle créée dans le flux intégré à l'application. 
+ Un tableau est toujours mis à plat sur une ligne minimum. Si aucune valeur n'est retournée (c'est-à-dire, si le tableau est vide ou si aucun de ses éléments n'est interrogé), une seule ligne comportant toutes les valeurs nulles est retournée.

  L'expression suivante retourne les enregistrements de l'exemple JSON précédent qui ont une valeur nulle, car il n'existe aucun élément correspondant à l'emplacement spécifié.

  ```
  $.orders[0:].itemId
  ```

  L'expression précédente retourne les éléments suivants de l'exemple JSON précédent.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/about-json-path.html)

## Rubriques connexes
<a name="about-json-path.Related"></a>
+ [Présentation de JSON](http://www.json.org/)

# Mappage d'éléments d'une source de streaming à des colonnes d'entrée SQL
<a name="sch-mapping"></a>

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez [Limites ](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Avec Amazon Kinesis Data Analytics, vous pouvez traiter et analyser des données de streaming au format JSON ou CSV à l’aide du langage SQL standard. 
+ Pour traiter et analyser des données de streaming au format CSV, vous devez attribuer des noms de colonne et des types de données aux colonnes du flux d'entrée. Votre application importe une colonne du flux d'entrée par définition de colonne, dans l'ordre. 

  Vous n'avez pas besoin d'inclure toutes les colonnes du flux d'entrée de l'application, mais vous ne pouvez pas ignorer les colonnes du flux source. Par exemple, vous pouvez importer les trois premières colonnes d'un flux d'entrée contenant cinq éléments, mais vous ne pouvez pas importer uniquement les colonnes 1, 2 et 4.
+ Pour traiter et analyser des données JSON en streaming, vous utilisez des JSONPath expressions pour mapper les éléments JSON d'une source de streaming aux colonnes SQL d'un flux d'entrée. Pour plus d'informations sur l'utilisation JSONPath d'Amazon Kinesis Data Analytics, [Travailler avec JSONPath](about-json-path.md) consultez. Les colonnes de la table SQL possèdent des types de données qui sont mappées à partir de types JSON. Pour connaître les types de données pris en charge, consultez [Types de données](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html). Pour plus d'informations sur la conversion des données JSON en données SQL, consultez [Mappage de types de données JSON à des types de données SQL](#sch-mapping-datatypes).

Pour plus d’informations sur la configuration des flux d’entrée, consultez [Configuration de l'entrée de l'application](how-it-works-input.md).

## Mappage de données JSON à des colonnes SQL
<a name="sch-mapping-json"></a>

 Vous pouvez mapper des éléments JSON à des colonnes d'entrée à l'aide de l'API AWS Management Console ou de l'API Kinesis Data Analytics. 
+ Pour mapper des éléments à des colonnes à l'aide de la console, consultez [Utilisation de l'éditeur de schéma](console-summary-edit-schema.md).
+ Pour mapper des éléments à des colonnes à l’aide de l’API Kinesis Data Analytics, reportez-vous à la section suivante.

Pour mapper des éléments JSON à des colonnes du flux d'entrée intégré à l'application, vous avez besoin d'un schéma avec les informations suivantes pour chaque colonne :
+ **Expression source :** JSONPath expression qui identifie l'emplacement des données de la colonne.
+ **Nom de la colonne :** Nom que vos requêtes SQL utilisent pour référencer les données.
+ **Type de données : **Type de données SQL de la colonne.

## Utilisation de l’API
<a name="sf-map-api"></a>

Pour mapper des éléments d’une source de streaming à des colonnes d’entrée, vous pouvez utiliser l’action [CreateApplication](API_CreateApplication.md) de l’API Kinesis Data Analytics. Pour créer le flux intégré à l'application, spécifiez un schéma pour transformer vos données en une version schématisée utilisée dans SQL. L'action [CreateApplication](API_CreateApplication.md) configure votre application pour qu'elle reçoive des entrées d'une source de streaming. Pour mapper des éléments JSON ou des colonnes CSV à des colonnes SQL, vous créez un objet [RecordColumn](API_RecordColumn.md) dans le tableau [SourceSchema](API_SourceSchema.md) `RecordColumns`. L'objet [RecordColumn](API_RecordColumn.md) présente le schéma suivant:

```
{ 
    "Mapping": "String",
    "Name": "String",
    "SqlType": "String"
}
```

Les champs de l'objet [RecordColumn](API_RecordColumn.md) possèdent les valeurs suivantes :
+ `Mapping`: JSONPath expression qui identifie l'emplacement des données dans l'enregistrement du flux d'entrée. Cette valeur n'est pas disponible pour le schéma d'entrée d'un flux source au format CSV.
+ `Name` : Nom de la colonne dans le flux de données SQL intégré à l'application.
+ `SqlType` : Type des données du flux de données SQL intégré à l'application.

### Exemple de schéma d'entrée JSON
<a name="sf-map-api-json-example"></a>

L'exemple suivant présente le format de la valeur `InputSchema` pour un schéma JSON.

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(4)",
            "Name": "TICKER_SYMBOL",
            "Mapping": "$.TICKER_SYMBOL"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "SECTOR",
            "Mapping": "$.SECTOR"
        },
        {
            "SqlType": "TINYINT",
            "Name": "CHANGE",
            "Mapping": "$.CHANGE"
        },
        {
            "SqlType": "DECIMAL(5,2)",
            "Name": "PRICE",
            "Mapping": "$.PRICE"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "JSONMappingParameters": {
                "RecordRowPath": "$"
            }
        },
        "RecordFormatType": "JSON"
    },
    "RecordEncoding": "UTF-8"
}
```

### Exemple de schéma d'entrée CSV
<a name="sf-map-api-csv-example"></a>

L'exemple suivant présente le format de la valeur `InputSchema` pour un schéma CSV.

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(16)",
            "Name": "LastName"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "FirstName"
        },
        {
            "SqlType": "INTEGER",
             "Name": "CustomerId"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "CSVMappingParameters": {
                "RecordColumnDelimiter": ",",
                "RecordRowDelimiter": "\n"
            }
        },
        "RecordFormatType": "CSV"
    },
    "RecordEncoding": "UTF-8"
}
```

## Mappage de types de données JSON à des types de données SQL
<a name="sch-mapping-datatypes"></a>

Les types de données JSON sont convertis en types de données SQL correspondants selon le schéma d'entrée de l'application. Pour plus d’informations sur les types de données SQL pris en charge, consultez [Types de données](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html). Amazon Kinesis Data Analytics convertit les types de données JSON en types de données SQL selon les règles suivantes.

### Littéral null
<a name="sch-mapping-datatypes-null"></a>

Un littéral null dans le flux d'entrée JSON (c'est-à-dire `"City":null`) est converti en null SQL, quel que soit le type de données de destination.

### Littéral booléen
<a name="sch-mapping-datatypes-boolean"></a>

Un littéral booléen dans le flux d'entrée JSON (c'est-à-dire `"Contacted":true`) est converti en données SQL comme suit :
+ Numérique (DECIMAL, INT, etc.) : `true` est converti en 1 ; `false` est converti en 0.
+ Binaire (BINARY ou VARBINARY) : 
  + `true` : Le résultat possède l'ensemble de bits le plus faible, les autres bits sont effacés.
  + `false` : Tous les bits sont effacés.

  La conversion en données VARBINARY génère une valeur de 1 octet de longueur.
+ BOOLEEN : Conversion dans la valeur BOOLEENNE SQL correspondante.
+ Caractère (CHAR ou VARCHAR) : Conversion dans la valeur de chaîne correspondante (`true` ou `false`). La valeur est tronquée pour s'adapter à la longueur du champ.
+ Date/heure (DATE, TIME ou TIMESTAMP) : La conversion échoue et une erreur de forçage de type est écrite dans le flux d'erreurs.

### Number
<a name="sch-mapping-datatypes-number"></a>

Un littéral numérique dans le flux d'entrée JSON (c'est-à-dire `"CustomerId":67321`) est converti en données SQL comme suit :
+ Numérique (DECIMAL, INT, etc.) : Conversion directe. Si la valeur convertie dépasse la taille ou la précision du type de données cible (c'est-à-dire, conversion de `123.4` en INT), la conversion échoue et une erreur de forçage du type est écrite dans le flux d'erreurs. 
+ Binaire (BINARY or VARBINARY) : La conversion échoue et une erreur de forçage du type est écrite dans le flux d'erreurs.
+ BOOLEAN: 
  + `0` : Conversion `false`.
  + Tous les autres nombres : Conversion en `true`.
+ Caractère (CHAR ou VARCHAR) : Conversion en une représentation de chaîne du nombre.
+ Date/heure (DATE, TIME ou TIMESTAMP) : La conversion échoue et une erreur de forçage de type est écrite dans le flux d'erreurs.

### String
<a name="sch-mapping-datatypes-string"></a>

Une valeur de chaîne dans le flux d'entrée JSON (c'est-à-dire `"CustomerName":"John Doe"`) est convertie en données SQL comme suit :
+ Numérique (DECIMAL, INT, etc.) : Amazon Kinesis Data Analytics tente de convertir la valeur dans le type de données cible. Si la valeur ne peut être convertie, la conversion échoue et une erreur de forçage du type est écrite dans le flux d'erreurs.
+ Binaire (BINARY ou VARBINARY) : Si la chaîne source est un littéral binaire valide (c'est-à-dire `X'3F67A23A'`, avec un nombre pair de f), la valeur est convertie dans le type de données cible. Dans le cas contraire, la conversion échoue et une erreur de forçage du type est écrite dans le flux d'erreurs.
+ BOOLEEN : Si la chaîne source est `"true"`, conversion en `true`. Cette comparaison n'est pas sensible à la casse. Sinon, conversion en `false`.
+ Caractère (CHAR ou VARCHAR) : Conversion dans la valeur de chaîne de l'entrée. Si la valeur est plus longue que le type de données cible, elle est tronquée et aucune erreur n'est écrite dans le flux d'erreurs.
+ Date/heure (DATE, TIME ou TIMESTAMP) : Si la chaîne source est dans un format convertible dans la valeur cible, la valeur est convertie. Dans le cas contraire, la conversion échoue et une erreur de forçage du type est écrite dans le flux d'erreurs.

  Les formats de date et d'heure sont les suivants :
  + "1992-02-14"
  + "1992-02-14 18:35:44.0"

### Tableau ou objet
<a name="sch-mapping-datatypes-array"></a>

Un tableau ou un objet dans le flux d'entrée JSON est converti en données SQL comme suit :
+ Caractère (CHAR ou VARCHAR) : Convertit en texte source du tableau ou de l'objet. Consultez [Accès aux tableaux](about-json-path.md#about-json-path-arrays).
+ Tous les autres types de données : La conversion échoue et une erreur de forçage du type est écrite dans le flux d'erreurs.

Pour voir un exemple de tableau JSON, consultez [Travailler avec JSONPath](about-json-path.md).

## Rubriques connexes
<a name="sch-mapping.Related"></a>
+ [Configuration de l'entrée de l'application](how-it-works-input.md)
+ [Types de données](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)
+ [Utilisation de l'éditeur de schéma](console-summary-edit-schema.md)
+ [CreateApplication](API_CreateApplication.md)
+ [RecordColumn](API_RecordColumn.md)
+ [SourceSchema](API_SourceSchema.md)

# Utilisation de la fonction de découverte de schéma sur des données de diffusion
<a name="sch-dis"></a>

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez [Limites ](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Fournir un schéma d'entrée qui décrit la façon dont les enregistrements du flux d'entrée sont mappés à un flux intégré à l'application peut s'avérer fastidieux et source d'erreurs. Vous pouvez utiliser l'API [DiscoverInputSchema](API_DiscoverInputSchema.md) (appelée *API de découverte*) pour déduire un schéma. En utilisant des échantillons aléatoires d'enregistrements de la source de diffusion, l'API peut déduire un schéma (autrement dit, les noms de colonnes, les types de données et la position de l'élément de données dans les données entrantes). 

**Note**  
Pour utiliser l’API de découverte pour générer un schéma à partir d’un fichier stocké dans Amazon S3, consultez [Utilisation de la fonction de découverte de schéma sur des données statiques](sch-dis-ref.md). 

La console utilise l'API Discovery pour générer un schéma pour une source de streaming spécifiée. À l'aide de la console, vous pouvez également mettre à jour le schéma, notamment en ajoutant ou en supprimant des colonnes, en modifiant les noms de colonnes ou les types de données, etc. Cependant, apportez les modifications avec soin pour veiller à ne pas créer un schéma non valide. 

Une fois que vous avez finalisé le schéma de votre flux intégré à l'application, vous pouvez modifier des valeurs de chaîne et de date/heure. Vous pouvez utiliser ces fonctions dans votre code d'application lorsque vous utilisez des lignes dans le flux intégré à l'application qui en résulte. Pour de plus amples informations, veuillez consulter [Exemple : transformation DateTime des valeurs](app-string-datetime-manipulation.md).

## Attribution de noms de colonnes pendant la découverte de schéma
<a name="sch-dis-column-names"></a>

Pendant la découverte de schéma, Amazon Kinesis Data Analytics tente de conserver autant que possible le nom d’origine de la colonne de la source d’entrée de streaming, sauf dans les cas suivants :
+ Le nom de la colonne du flux source est un mot réservé SQL, par exemple `TIMESTAMP`, `USER`, `VALUES` ou `YEAR`. 
+ Le nom de la colonne du flux source contient des caractères non pris en charge. Seuls les lettres, les chiffres et les caractères de soulignement (\$1) sont pris en charge.
+ Le nom de la colonne du flux source commence par un chiffre.
+ Le nom de la colonne du flux source comporte plus de 100 caractères.

Si une colonne est renommée, son nouveau nom commence par `COL_`. Dans certains cas, aucun nom de colonne d’origine ne peut être conservé, par exemple si le nom complet est composé de caractères non pris en charge. Dans ce cas, la colonne est nommée `COL_#`, \$1 correspondant au chiffre de l'emplacement de la colonne dans l'ordre des colonnes.

Une fois la découverte terminée, vous pouvez mettre le schéma à jour à l'aide de la console en ajoutant, supprimant ou renommant des colonnes ou en modifiant les types ou la taille des données. 

### Exemples de noms de colonnes suggérés lors de la découverte
<a name="sch-dis-column-names-examples"></a>


****  

| Nom de la colonne dans le flux source | Nom de colonne suggéré lors de la découverte | 
| --- | --- | 
|  USER  |  COL\$1USER  | 
|  USER@DOMAIN  |  COL\$1USERDOMAIN  | 
|  @@  |  COL\$10  | 

## Problèmes liés à la découverte d'un schéma
<a name="sch-dis-when-fails"></a>

Que se passe-t-il si Kinesis Data Analytics ne déduit pas un schéma pour une source de streaming donnée ? 

Kinesis Data Analytics déduit votre schéma pour les formats courants, tels que CSV et JSON, qui sont codés en UTF-8. Kinesis Data Analytics prend en charge tous les enregistrements codés en UTF-8 (y compris le texte brut, comme des journaux d’applications, et des enregistrements) avec un délimiteur de ligne et de colonne personnalisé. Si Kinesis Data Analytics ne déduit pas un schéma, vous pouvez définir un schéma manuellement à l’aide de l’éditeur de schéma dans la console (ou à l’aide de l’API).

 Si vous vos données ne suivent pas un modèle (que vous pouvez spécifier à l'aide de l'éditeur de schéma), vous pouvez définir un schéma en tant que colonne unique de type VARCHAR(N), où N est le plus grand nombre de caractères que vous prévoyez que votre enregistrement puisse inclure. A partir de là, vous pouvez utiliser une manipulation de chaîne et de date/heure pour structurer vos données une fois que celles-ci sont dans un flux intégré à l'application. Pour obtenir des exemples, consultez [Exemple : transformation DateTime des valeurs](app-string-datetime-manipulation.md).

# Utilisation de la fonction de découverte de schéma sur des données statiques
<a name="sch-dis-ref"></a>

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez [Limites ](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

La fonctionnalité de découverte de schéma peut générer un schéma à partir des données dans un flux ou des données dans un fichier statique qui est stocké dans un compartiment Amazon S3. Supposons que vous souhaitez générer un schéma pour une application Kinesis Data Analytics à des fins de référence ou lorsque des données de streaming ne sont pas disponibles. Vous pouvez utiliser la fonctionnalité de découverte de schéma sur un fichier statique qui contient un échantillon de données au format attendu de vos données de diffusion ou de référence. Kinesis Data Analytics peut exécuter une découverte de schéma sur des exemples de données provenant d’un fichier JSON ou CSV stocké dans un compartiment Amazon S3. L'utilisation de la découverte de schéma sur un fichier de données fait appel soit à la console, soit à l'API [DiscoverInputSchema](API_DiscoverInputSchema.md) avec le paramètre `S3Configuration` spécifié.

## Exécution de la découverte de schéma à l'aide de la console
<a name="sch-dis-ref-console"></a>

Pour exécuter la découverte sur un fichier statique à l'aide de la console, procédez comme suit :

1. Ajoutez un objet de données de référence à un compartiment S3.

1. Choisissez **Connecter des données de référence** dans la page principale de l’application dans la console Kinesis Data Analytics.

1. Fournissez les données relatives au compartiment, au chemin d’accès et au rôle IAM pour pouvoir accéder à l’objet Amazon S3 contenant les données de référence.

1. Choisissez **Discover schema (Découvrir le schéma)**.

Pour plus d'informations sur l'ajout des données de référence et la découverte du schéma dans la console, consultez [Exemple : ajout de données de référence à une application Kinesis Data Analytics](app-add-reference-data.md).

## Exécution de la découverte de schéma à l'aide de l'API
<a name="sch-dis-ref-api"></a>

Pour exécuter la découverte sur un fichier statique à l'aide de l'API, vous devez fournir l'API avec une structure `S3Configuration` à l'aide des informations suivantes :
+ `BucketARN` : l’Amazon Resource Name (ARN) du compartiment Amazon S3 qui contient le fichier. Pour le format de l'ARN d'un compartiment Amazon S3, consultez [Amazon Resource Names (ARNs) et Amazon Service Namespaces : Amazon Simple Storage Service (Amazon S3](https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-s3)).
+ `RoleARN` : l’ARN d’un rôle IAM avec la stratégie `AmazonS3ReadOnlyAccess`. Pour plus d'informations sur comment ajouter une stratégie à un rôle, consultez [Modification d'un rôle](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html).
+ `FileKey` : le nom de fichier de l'objet.

**Pour générer un schéma à partir d’un objet Amazon S3 à l’aide de l’API `DiscoverInputSchema`**

1. Assurez-vous que vous disposez de la AWS CLI configuration requise. Pour plus d'informations, consultez [Étape 2 : configurer le AWS Command Line Interface (AWS CLI)](setup-awscli.md) dans la section Mise en route.

1. Créez un fichier nommé `data.csv` avec le contenu suivant :

   ```
   year,month,state,producer_type,energy_source,units,consumption
   2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615
   2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535
   2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890
   2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
   2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681
   ```

1. Connectez-vous à la console Amazon S3 à l'adresse [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Créez un compartiment Amazon S3 et chargez le fichier `data.csv` que vous avez créé. Notez l'ARN du compartiment créé. Pour plus d’informations sur la création d’un compartiment Amazon S3 et le chargement d’un fichier, consultez [Démarrez avec Amazon Simple Storage Service](https://docs.aws.amazon.com/AmazonS3/latest/userguide/GetStartedWithS3.html). 

1. Ouvrez la console IAM à l’adresse [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/). Créez un rôle avec la stratégie `AmazonS3ReadOnlyAccess`. Notez l'ARN du nouveau rôle. Pour plus d’informations sur la création d’un rôle, consultez [Création d’un rôle pour déléguer des autorisations à un service Amazon](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html). Pour plus d'informations sur comment ajouter une stratégie à un rôle, consultez [Modification d'un rôle](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html).

1. Exécutez la `DiscoverInputSchema` commande suivante dans le AWS CLI, en remplaçant votre compartiment Amazon S3 et votre rôle IAM : ARNs 

   ```
   $aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN": "arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN": "arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }' 
   ```

1. La réponse ressemble à ce qui suit :

   ```
   {
       "InputSchema": {
           "RecordEncoding": "UTF-8",
           "RecordColumns": [
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_year"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_month"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "state"
               },
               {
                   "SqlType": "VARCHAR(64)",
                   "Name": "producer_type"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "energy_source"
               },
               {
                   "SqlType": "VARCHAR(16)",
                   "Name": "units"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "consumption"
               }
           ],
           "RecordFormat": {
               "RecordFormatType": "CSV",
               "MappingParameters": {
                   "CSVMappingParameters": {
                       "RecordRowDelimiter": "\r\n",
                       "RecordColumnDelimiter": ","
                   }
               }
           }
       },
       "RawInputRecords": [
           "year,month,state,producer_type,energy_source,units,consumption\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
       ],
       "ParsedInputRecords": [
           [
               null,
               null,
               "state",
               "producer_type",
               "energy_source",
               "units",
               null
           ],
           [
               "2001",
               "1",
               "AK",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "47615"
           ],
           [
               "2001",
               "1",
               "AK",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "16535"
           ],
           [
               "2001",
               "1",
               "AK",
               "CombinedHeatandPowerElectricPower",
               "Coal",
               "ShortTons",
               "22890"
           ],
           [
               "2001",
               "1",
               "AL",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "3020601"
           ],
           [
               "2001",
               "1",
               "AL",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "2987681"
           ]
       ]
   }
   ```

# Prétraitement des données à l’aide d’une fonction Lambda
<a name="lambda-preprocessing"></a>

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez [Limites ](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Si les données de votre flux nécessitent une conversion de format, une transformation, un enrichissement ou un filtrage, vous pouvez les prétraiter à l'aide d'une AWS Lambda fonction. Vous pouvez effectuer cette opération avant que le code SQL de votre application s'exécute ou avant que votre application crée un schéma à partir de votre flux de données. 

L’utilisation d’une fonction Lambda de prétraitement des enregistrements est utile dans les cas suivants :
+ Transformation d’enregistrements à partir d’autres formats (comme KPL ou GZIP) en formats pouvant être analysés par Kinesis Data Analytics. Kinesis Data Analytics prend actuellement en charge les formats de données JSON ou CSV.
+ Développement des données dans un format qui est plus accessible pour des opérations telles que le regroupement ou la détection des anomalies. Par exemple, si plusieurs valeurs de données sont stockées ensemble dans une chaîne, vous pouvez développer les données en colonnes distinctes.
+ Enrichissement des données avec d’autres services Amazon, comme l’extrapolation ou la correction d’erreurs.
+ Application de transformation de chaîne complexe à des champs d'enregistrement.
+ Filtrage de données pour nettoyer les données.

## L’utilisation d’une fonction Lambda pour le prétraitement des enregistrements
<a name="lambda-preprocessing-use"></a>

Lorsque vous créez votre application Kinesis Data Analytics, vous activez le prétraitement Lambda sur la page **Se connecter à une source**.

**Pour utiliser une fonction Lambda pour prétraiter des enregistrements dans une application Kinesis Data Analytics**

1. Connectez-vous à la console Managed Service for Apache Flink AWS Management Console et ouvrez-la à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Sur la page **Se connecter à une source** pour votre application, choisissez **Activé** dans la section **Prétraitement d’enregistrements avec AWS Lambda**.

1. Pour utiliser une fonction Lambda que vous avez déjà créée, choisissez la fonction dans la liste déroulante **Fonction Lambda**.

1. Pour créer une nouvelle fonction Lambda à partir de l’un des modèles de prétraitement Lambda, choisissez le modèle dans la liste déroulante. Ensuite, choisissez **Afficher <nom\$1modèle> dans Lambda** pour modifier la fonction.

1. Pour créer une nouvelle fonction Lambda, choisissez **Créer**. *Pour plus d'informations sur la création d'une fonction Lambda, consultez les sections [Créer une fonction HelloWorld Lambda et Explorez la console dans le manuel du développeur](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html).AWS Lambda *

1. Choisissez la version de la fonction Lambda à utiliser. Pour utiliser la dernière version, choisissez **\$1LATEST**.

Lorsque vous choisissez ou créez une fonction Lambda pour le prétraitement d’enregistrements, les enregistrements sont prétraités avant que le code SQL de votre application ne s’exécute ou que votre application ne génère un schéma à partir des enregistrements.

## Autorisations de prétraitement Lambda
<a name="lambda-preprocessing-policy"></a>

Pour utiliser le prétraitement Lambda, le rôle IAM de l’application a besoin de la stratégie d’autorisations suivante :

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Métriques de prétraitement Lambda
<a name="lambda-preprocessing-metrics"></a>

Vous pouvez utiliser Amazon CloudWatch pour surveiller le nombre d'appels Lambda, le nombre d'octets traités, les réussites et les échecs, etc. [Pour plus d'informations sur CloudWatch les métriques émises par le prétraitement Lambda de Kinesis Data Analytics, consultez Amazon Kinesis Analytics Metrics.](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)

## Utilisation AWS Lambda avec la bibliothèque Kinesis Producer
<a name="lambda-preprocessing-deaggregation"></a>

La [bibliothèque producteur Kinesis](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) regroupe de petits enregistrements formatés par l'utilisateur en enregistrements plus volumineux allant jusqu'à 1 Mo afin de mieux utiliser le débit Amazon Kinesis Data Streams. La bibliothèque Kinesis Client Library (KCL) pour Java prend en charge la désagrégation de ces enregistrements. Cependant, vous devez utiliser un module spécial pour désagréger les enregistrements lorsque vous les utilisez AWS Lambda en tant que consommateur de vos streams. 

Pour obtenir le code de projet et les instructions nécessaires, consultez les [modules de désagrégation de la bibliothèque Kinesis Producer](https://github.com/awslabs/kinesis-deaggregation) pour plus d'informations. AWS Lambda GitHub Vous pouvez utiliser les composants de ce projet pour traiter des données sérialisées KPL AWS Lambda dans Java, Node.js et Python. Vous pouvez également utiliser ces composants dans le cadre d’une [application KCL multi-lang](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java).

## Modèle de Model/Record réponse aux données d'entrée d'événements de prétraitement
<a name="lambda-preprocessing-data-model"></a>

Pour prétraiter des enregistrements, votre fonction Lambda doit être conforme aux modèles de données d’entrée d’événement et de réponse d’enregistrement imposés. 

### Modèle de données d'entrée d'événement
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics lit en permanence les données de votre flux de données Kinesis ou de votre flux de diffusion Firehose. Pour chaque lot d’enregistrements qu’il récupère, le service gère la manière dont chaque lot est transmis à votre fonction Lambda. Votre fonction reçoit une liste d'enregistrements en entrée. Au sein de votre fonction, vous effectuez une itération dans la liste et vous appliquez votre logique métier pour réaliser vos exigences de prétraitement (par exemple, conversion du format de données ou enrichissement). 

Le modèle d'entrée de votre fonction de prétraitement varie légèrement selon que les données proviennent d'un flux de données Kinesis ou d'un flux de diffusion Firehose. 

Si la source est un flux de diffusion Firehose, le modèle de données d'entrée des événements est le suivant :

**Modèle de données de demande Kinesis Data Firehose**


| Champ | Description | 
| --- | --- | 
| Champ | Description | 
| --- | --- | 
| Champ | Description | 
| --- | --- | 
| invocationId | ID d’invocation Lambda (GUID aléatoire). | 
| applicationArn | Amazon Resource Name (ARN) de l’application Kinesis Data Analytics | 
| streamArn | ARN du flux de diffusion | 
| enregistrements [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | ID d'enregistrement (GUID aléatoire) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Charge utile d'enregistrement source codée en base64 | 
| approximateArrivalTimestamp | Heure d'arrivée approximative de l'enregistrement de flux de diffusion | 

L'exemple suivant montre l'entrée d'un flux de diffusion Firehose :

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

Si la source est un flux de données Kinesis, le modèle de données d’entrée d’événement se présente comme suit :

**Modèle de données de demande de flux Kinesis**.


| Champ | Description | 
| --- | --- | 
| Champ | Description | 
| --- | --- | 
| Champ | Description | 
| --- | --- | 
| invocationId | ID d’invocation Lambda (GUID aléatoire). | 
| applicationArn | ARN de l’application Kinesis Data Analytics | 
| streamArn | ARN du flux de diffusion | 
| enregistrements [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | ID d'enregistrement basé sur le numéro de séquence d'enregistrement Kinesis | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Charge utile d'enregistrement source codée en base64 | 
| sequenceNumber | Numéro de séquence de l'enregistrement de flux Kinesis | 
| partitionKey | Clé de partition de l'enregistrement de flux Kinesis | 
| shardId | ShardId de l'enregistrement de flux Kinesis | 
| approximateArrivalTimestamp | Heure d'arrivée approximative de l'enregistrement de flux de diffusion | 

L'exemple suivant montre l'entrée d'un flux de données Kinesis :

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### Modèle de réponse d'enregistrement
<a name="lambda-preprocessing-response-model"></a>

Tous les enregistrements renvoyés par votre fonction de prétraitement Lambda (avec enregistrement IDs) et envoyés à la fonction Lambda doivent être renvoyés. Ils doivent contenir les paramètres suivants. Sinon, Kinesis Data Analytics les rejette et les traite comme un échec de prétraitement des données. La partie charge utile des données de l'enregistrement peut être transformée pour réaliser les exigences de prétraitement.

**Modèle de données de réponse**


| Champ | Description | 
| --- | --- | 
| enregistrements [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | L’ID d’enregistrement est transmis depuis Kinesis Data Analytics vers Lambda pendant l’invocation. L'enregistrement transformé doit comporter le même ID d'enregistrement. La moindre incohérence entre l'ID de l'enregistrement initial et l'ID de l'enregistrement transformé est traitée comme un échec du prétraitement des données. | 
| result | État de la transformation de données de l'enregistrement. Les valeurs possibles sont : [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Charge utile des données transformées, d'après l'encodage en base64. Chaque charge utile de données peut contenir plusieurs documents JSON si le format de données d'ingestion de l'application est JSON. Ou chaque charge utile de données peut contenir plusieurs lignes CSV (avec un délimiteur de ligne indiqué dans chaque ligne) si le format de données d'ingestion de l'application est CSV. Le service Kinesis Data Analytics analyse et traite les données correctement avec plusieurs documents JSON ou lignes CSV dans la même charge utile de données.  | 

L'exemple suivant montre la sortie d'une fonction Lambda :

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## Échecs courants du prétraitement des données
<a name="lambda-preprocessing-failures"></a>

Voici les raisons courantes pour lesquelles le prétraitement peut échouer.
+ Tous les enregistrements (avec enregistrement IDs) d'un lot envoyés à la fonction Lambda ne sont pas renvoyés au service Kinesis Data Analytics. 
+ L'ID d'enregistrement ou le champ de charge utile de données est manquant dans la réponse. Le champ de charge utile de données est facultatif pour un enregistrement `Dropped` ou `ProcessingFailed`.
+ Les délais d’expiration de la fonction Lambda ne sont pas suffisants pour prétraiter les données.
+ La réponse de la fonction Lambda dépasse les limites de réponse imposées par le service AWS Lambda .

En cas d’échec de prétraitement des données, Kinesis Data Analytics continue de relancer les invocations Lambda sur le même ensemble d’enregistrements jusqu’à ce que cela aboutisse. Vous pouvez surveiller les CloudWatch indicateurs suivants pour mieux comprendre les défaillances.
+ `MillisBehindLatest` de l’application Kinesis Data Analytics : indique le retard d’une application pour la lecture de la source de streaming. 
+ Indicateurs de `InputPreprocessing` CloudWatch l'application Kinesis Data Analytics : indiquent le nombre de réussites et d'échecs, entre autres statistiques. Pour plus d'informations, consultez [Métriques Amazon Kinesis Analytics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html).
+ AWS Lambda CloudWatch métriques et journaux des fonctions.

# Création de fonctions Lambda pour le prétraitement
<a name="lambda-preprocessing-functions"></a>

Votre application Amazon Kinesis Data Analytics peut utiliser des fonctions Lambda pour le prétraitement des enregistrements à mesure que ceux-ci sont reçus dans l’application. Kinesis Data Analytics fournit dans la console les modèles suivants à utiliser comme point de départ pour le prétraitement de vos données.

**Topics**
+ [Création d’une fonction Lambda de prétraitement dans Node.js](#lambda-preprocessing-functions-nodejs)
+ [Création d’une fonction Lambda de prétraitement dans Python](#lambda-preprocessing-functions-python)
+ [Création d’une fonction Lambda de prétraitement dans Java](#lambda-preprocessing-functions-java)
+ [Création d’une fonction Lambda de prétraitement dans .NET](#lambda-preprocessing-functions-net)

## Création d’une fonction Lambda de prétraitement dans Node.js
<a name="lambda-preprocessing-functions-nodejs"></a>

Les modèles suivants pour créer une fonction Lambda de prétraitement dans Node.js sont disponibles dans la console Kinesis Data Analytics :


| Plan Lambda | Langage et version | Description | 
| --- | --- | --- | 
| Traitement d’entrée General Kinesis Data Analytics  | Node.js 6.10 |  Un préprocesseur d’enregistrement Kinesis Data Analytics qui reçoit des enregistrements JSON ou CSV en entrée, puis les renvoie avec un statut de traitement. Utilisez ce processeur comme point de départ pour une logique de transformation personnalisée.  | 
| Traitement d'entrée compressée | Node.js 6.10 | Un processeur d’enregistrements Kinesis Data Analytics qui reçoit des enregistrements JSON ou CSV compressés (GZIP ou compressés via Deflate) en entrée et renvoie des enregistrements décompressés avec un statut de traitement. | 

## Création d’une fonction Lambda de prétraitement dans Python
<a name="lambda-preprocessing-functions-python"></a>

Les modèles suivants pour créer une fonction Lambda de prétraitement dans Python sont disponibles dans la console :


| Plan Lambda | Langage et version | Description | 
| --- | --- | --- | 
| Traitement d'entrée General Kinesis Analytics  | Python 2.7 |  Un préprocesseur d’enregistrement Kinesis Data Analytics qui reçoit des enregistrements JSON ou CSV en entrée, puis les renvoie avec un statut de traitement. Utilisez ce processeur comme point de départ pour une logique de transformation personnalisée.  | 
| Traitement d'entrée KPL | Python 2.7 | Un processeur d’enregistrements Kinesis Data Analytics qui reçoit des regroupements KPL (Kinesis Producer Library) d’enregistrements JSON ou CSV en entrée et renvoie des enregistrements désagrégés avec un statut de traitement.  | 

## Création d’une fonction Lambda de prétraitement dans Java
<a name="lambda-preprocessing-functions-java"></a>

Pour créer une fonction Lambda dans Java pour prétraiter des enregistrements, utilisez les classes d’[événements Java](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events).

Le code suivant illustre un exemple de fonction Lambda qui prétraite des enregistrements à l’aide de Java :

```
public class LambdaFunctionHandler implements
        RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> {

    @Override
    public KinesisAnalyticsInputPreprocessingResponse handleRequest(
            KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("StreamArn is : " + event.streamArn);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>();
        KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp);
             // Add your record.data pre-processing logic here.                               
            // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>));
        });
        return response;
    }

}
```

## Création d’une fonction Lambda de prétraitement dans .NET
<a name="lambda-preprocessing-functions-net"></a>

Pour créer une fonction Lambda dans .NET pour prétraiter des enregistrements, utilisez les classes d’[événements .NET](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents).

Le code suivant illustre un exemple de fonction Lambda qui prétraite des enregistrements à l’aide de C\$1 :

```
public class Function
    {
        public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsInputPreprocessingResponse
            {
                Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
                context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}");
                context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add your record preprocessig logic here.

                var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsInputPreprocessingResponse.OK
                };
                preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
                response.Records.Add(preprocessedRecord);
            }
            return response;
        }
    }
```

Pour plus d’informations sur la création de fonctions Lambda pour le prétraitement et les destinations dans .NET, consultez [https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents).

# Mise en parallèle des flux d'entrée pour un débit accru
<a name="input-parallelism"></a>

**Note**  
Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez [Limites ](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Les applications Amazon Kinesis Data Analytics peuvent prendre en charge plusieurs flux d’entrée intégrés à l’application pour mettre une application à l’échelle au-delà du débit d’un flux d’entrée intégré. Pour plus d'informations sur les flux d'entrée intégrés à l'application, consultez [Applications Amazon Kinesis Data Analytics pour SQL : fonctionnement](how-it-works.md).

Dans presque tous les cas, Amazon Kinesis Data Analytics adapte votre application pour gérer la capacité des flux Kinesis ou des flux sources Firehose qui alimentent votre application. Toutefois, si le débit de votre flux source est supérieur au débit d'un flux d'entrée intégré à l'application, vous pouvez augmenter sensiblement le nombre de flux d'entrée que votre application utilise. Pour ce faire, vous devez utiliser le paramètre `InputParallelism`.

Lorsque le paramètre `InputParallelism` est supérieur à un, Amazon Kinesis Data Analytics répartit uniformément les partitions de votre flux source entre les flux intégrés à l’application. Par exemple, si votre flux source possède 50 partitions et que vous avez défini `InputParallelism` sur `2`, chaque flux d'entrée intégré à l'application reçoit les entrées de 25 partitions du flux source. 

Si vous augmentez le nombre de flux, votre application doit accéder explicitement aux données de chaque flux. Pour plus d'informations sur l'accès aux différents flux intégrés à l'application dans votre code, consultez [Accès à plusieurs flux intégrés à l’application au sein de votre application Amazon Kinesis Data Analytics](#input-parallelism-code-example).

Bien que les fragments de flux Kinesis Data Streams et Firehose soient tous deux répartis de la même manière entre les flux intégrés à l'application, ils apparaissent différemment dans votre application :
+ Les enregistrements d’un flux de données Kinesis incluent un champ `shard_id` qui peut servir à identifier la partition source de l’enregistrement.
+ Les enregistrements d'un flux de diffusion Firehose n'incluent pas de champ identifiant la partition ou la partition source de l'enregistrement. Cela est dû au fait que Firehose extrait ces informations de votre application.

## Evaluation de l'augmentation ou non de votre nombre de flux d'entrée intégrés à l'application
<a name="input-parallelism-evaluating"></a>

Dans la plupart des cas, un flux d'entrée intégré à l'application peut traiter le débit d'un flux source, selon la complexité et la taille des données des flux d'entrée. Pour déterminer si vous devez augmenter le nombre de flux d'entrée intégrés à l'application, vous pouvez surveiller les `MillisBehindLatest` métriques `InputBytes` et dans Amazon CloudWatch. 

Si la `InputBytes` métrique est supérieure à 100 MB/sec (ou si vous prévoyez qu'elle sera supérieure à ce taux), cela peut entraîner une augmentation `MillisBehindLatest` et un impact accru des problèmes liés aux applications. Pour résoudre ce problème, il est recommandé de faire les choix de langues suivants pour votre application :
+ Utilisez plusieurs flux et applications Kinesis Data Analytics pour SQL si les besoins de mise à l’échelle de votre application sont supérieurs à 100 Mo/seconde.
+ Utilisez les [applications Kinesis Data Analytics pour Java](/managed-flink/latest/java/what-is.html) si vous voulez utiliser un seul flux et une application.

Si la métrique `MillisBehindLatest` possède l'une des caractéristiques suivantes, vous devez augmenter le paramètre `InputParallelism` de votre application :
+ La métrique `MillisBehindLatest` augmente progressivement, ce qui indique que votre application est en retard par rapport aux dernières données du flux.
+ La métrique `MillisBehindLatest` est systématiquement supérieure à 1 000 (une seconde).

Vous n'avez pas besoin d'augmenter le paramètre `InputParallelism` de votre application si :
+ La métrique `MillisBehindLatest` baisse progressivement, ce qui indique que votre application devance les dernières données du flux.
+ La métrique `MillisBehindLatest` est systématiquement inférieure à 1 000 (une seconde).

Pour plus d'informations sur l'utilisation CloudWatch, consultez le [guide de CloudWatch l'utilisateur](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).

## Implémentation de plusieurs flux d'entrée intégrés à l'application
<a name="input-parallelism-implementing"></a>

Vous pouvez définir le nombre de flux d'entrée intégrés à l'application lorsqu'une application est créée à l'aide de [CreateApplication](API_CreateApplication.md). Ce nombre est défini une fois que l'application a été créée à l'aide de [UpdateApplication](API_UpdateApplication.md). 

**Note**  
Vous ne pouvez définir le paramètre `InputParallelism` qu’à l’aide de l’API Amazon Kinesis Data Analytics ou de l’ AWS CLI. Vous ne pouvez pas définir ce paramètre à l'aide du AWS Management Console. Pour plus d'informations sur la configuration du AWS CLI, voir[Étape 2 : configurer le AWS Command Line Interface (AWS CLI)](setup-awscli.md).

### Définition du nombre de flux d'entrée d'une nouvelle application
<a name="input-parallelism-implementing-create"></a>

L'exemple suivant montre comment utiliser l'action d'API `CreateApplication` pour définir le nombre de flux d'entrée d'une nouvelle application sur 2. 

Pour plus d’informations sur `CreateApplication`, consultez [CreateApplication](API_CreateApplication.md).

```
{
   "ApplicationCode": "<The SQL code the new application will run on the input stream>",
   "ApplicationDescription": "<A friendly description for the new application>",
   "ApplicationName": "<The name for the new application>",
   "Inputs": [ 
    { 
      "InputId": "ID for the new input stream",
      "InputParallelism": { 
        "Count": 2
    }],
   "Outputs": [ ... ],
	}]
}
```

### Définition du nombre de flux d'entrée d'une application existante
<a name="input-parallelism-implementing-update"></a>

L'exemple suivant montre comment utiliser l'action d'API `UpdateApplication` pour définir le nombre de flux d'entrée d'une application existante sur 2.

Pour plus d’informations sur `Update_Application`, consultez [UpdateApplication](API_UpdateApplication.md).

```
{
   "InputUpdates": [ 
      { 
         "InputId": "yourInputId",
         "InputParallelismUpdate": { 
            "CountUpdate": 2
         }
      }
   ],
}
```

## Accès à plusieurs flux intégrés à l’application au sein de votre application Amazon Kinesis Data Analytics
<a name="input-parallelism-code-example"></a>

Pour utiliser plusieurs flux d'entrée intégrés à l'application au sein de votre application, vous devez sélectionner explicitement différents flux. L'exemple de code suivant montre comment interroger plusieurs flux d'entrée intégrés à l'application créés dans le didacticiel de mise en route. 

Dans l'exemple suivant, chaque flux source est d'abord agrégé avec [COUNT](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-count.html) avant d'être combiné en un seul flux intégré à l'application, appelé `in_application_stream001`. L'agrégation des flux source en amont vous permet de vous assurer que les flux combinés peuvent gérer le trafic de plusieurs flux sans être surchargés. 

**Note**  
Pour exécuter cet exemple et obtenir des résultats des deux flux d'entrée intégrés à l'application, vous devez mettre à jour le nombre de partitions dans votre flux source et le paramètre `InputParallelism` dans votre application.

```
CREATE OR REPLACE STREAM in_application_stream_001 (
    ticker VARCHAR(64),
    ticker_count INTEGER
);

CREATE OR REPLACE PUMP pump001 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_001
GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol; 
        
CREATE OR REPLACE PUMP pump002 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_002
GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol;
```

L'exemple de code précédent produit dans `in_application_stream001` un résultat similaire à ce qui suit :

![\[Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/input-parallelism-results.png)


## Considérations supplémentaires
<a name="input-parallelism-considerations"></a>

Si vous utilisez plusieurs flux d'entrée, vous devez savoir que :
+ Le nombre maximal de flux d'entrée intégrés à l'application est de 64.
+ Les flux d'entrée intégrés à l'application sont répartis uniformément entre les partitions du flux d'entrée de l'application.
+ Les gains de performances résultant de l'ajout de flux intégrés à l'application ne sont pas linéaires. Ainsi, le fait de doubler le nombre de flux intégrés à l'application ne double pas le débit. Avec une taille de ligne classique, chaque flux intégré à l'application peut atteindre un débit de 5 000 à 15 000 lignes par seconde. En passant le nombre de flux intégrés à l'application à 10, vous pouvez obtenir un débit de 20 000 à 30 000 lignes par seconde. La vitesse du débit dépend du nombre, des types et de la taille des données des champs dans le flux d'entrée.
+ Certaines fonctions d'agrégation (comme [AVG](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-avg.html)) peuvent générer des résultats inattendus en cas de répartition des flux d'entrée entre plusieurs partitions. Etant donné que vous devez agréger toutes les partitions avant de les rassembler en un flux agrégé, les résultats peuvent être dirigés vers le flux contenant le plus d'enregistrements, quel qu'il soit.
+ Si les performances de votre application continuent d'être médiocres (reflétées par un `MillisBehindLatest` indicateur élevé) après avoir augmenté le nombre de flux d'entrée, vous avez peut-être atteint votre limite d'unités de traitement Kinesis ()KPUs. Pour de plus amples informations, veuillez consulter [Dimensionnement automatique des applications pour augmenter le débit](how-it-works-autoscaling.md).