

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utiliser le protocole de validation optimisé pour EMRFS S3
<a name="emr-spark-s3-optimized-commit-protocol"></a>

Le protocole de validation optimisé pour EMRFS S3 est une [FileCommitProtocol](https://spark.apache.org/docs/2.2.0//api/java/org/apache/spark/internal/io/FileCommitProtocol.html)implémentation alternative optimisée pour écrire des fichiers avec remplacement dynamique de partition Spark sur Amazon S3 lors de l'utilisation d'EMRFS. Le protocole améliore les performances des applications en évitant les opérations de renommage dans Amazon S3 pendant la phase de validation de la tâche de remplacement de partition dynamique Spark. 

Notez que le [validateur optimisé pour EMRFS S3 améliore également les performances en évitant](emr-spark-s3-optimized-committer.html) les opérations de renommage. Cependant, cela ne fonctionne pas pour les cas de remplacement de partition dynamique, tandis que les améliorations du protocole de validation ne ciblent que les cas de remplacement de partition dynamique.

Le protocole de validation est disponible avec les versions 5.30.0 et suivantes d'Amazon EMR et 6.2.0 et suivantes, et il est activé par défaut. Amazon EMR a ajouté une amélioration du parallélisme à partir de la version 5.31.0. Le protocole est utilisé pour les tâches Spark qui utilisent Spark ou Datasets. DataFrames Dans certaines circonstances, le protocole de validation n'est pas utilisé. Pour de plus amples informations, veuillez consulter [Conditions requises pour le protocole de validation EMRFS optimisé pour S3](emr-spark-committer-reqs.md).

**Topics**
+ [

# Conditions requises pour le protocole de validation EMRFS optimisé pour S3
](emr-spark-commit-protocol-reqs.md)
+ [

# Le protocole de validation EMRFS optimisé pour S3 et les chargements partitionnés
](emr-spark-commit-protocol-multipart.md)
+ [

# Considérations de réglage de tâche
](emr-spark-commit-protocol-tuning.md)

# Conditions requises pour le protocole de validation EMRFS optimisé pour S3
<a name="emr-spark-commit-protocol-reqs"></a>

Le protocole de validation EMRFS optimisé pour S3 est utilisé lorsque les conditions suivantes sont remplies :
+ Vous exécutez des tâches Spark qui utilisent Spark ou Datasets pour remplacer les tables partitionnées. DataFrames
+ Vous exécutez des tâches Spark dont le mode de remplacement de partition est `dynamic`.
+ Les chargements partitionnés sont activés dans Amazon EMR. Il s’agit de l’option par défaut. Pour de plus amples informations, veuillez consulter [Le protocole de validation EMRFS optimisé pour S3 et les chargements partitionnés](emr-spark-commit-protocol-multipart.md). 
+ Le cache du système de fichiers pour EMRFS est activé. Il s’agit de l’option par défaut. Vérifiez que le paramètre `fs.s3.impl.disable.cache` est défini sur `false`. 
+ Le support intégré des sources de données de Spark est utilisé. La prise en charge de la source de données intégrée est utilisée dans les circonstances suivantes :
  + Lorsque les tâches écrivent dans des sources de données ou des tables intégrées.
  + Lorsque les tâches écrivent dans la table Parquet du métastore Hive. Cela se produit lorsque `spark.sql.hive.convertInsertingPartitionedTable` et `spark.sql.hive.convertMetastoreParquet` sont tous deux définis sur true. Il s'agit des paramètres par défaut.
  + Lorsque les jobs écrivent dans la table ORC du métastore Hive. Cela se produit lorsque `spark.sql.hive.convertInsertingPartitionedTable` et `spark.sql.hive.convertMetastoreOrc` sont tous deux définis sur `true`. Il s'agit des paramètres par défaut.
+ Les opérations de tâches Spark qui écrivent dans un emplacement de partition par défaut (par exemple, `${table_location}/k1=v1/k2=v2/`) utilisent le protocole de validation. Le protocole n'est pas utilisé si une opération de tâche écrit dans un emplacement de partition personnalisé (par exemple, si un emplacement de partition personnalisé est défini à l'aide de la commande `ALTER TABLE SQL`).
+ Les valeurs suivantes pour Spark doivent être utilisées :
  + `spark.sql.sources.commitProtocolClass` doit être défini sur `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`. Il s'agit du paramètre par défaut pour les versions 5.30.0 et supérieures d'Amazon EMR, ainsi que pour les versions 6.2.0 et supérieures. 
  + L'option d'écriture `partitionOverwriteMode` ou `spark.sql.sources.partitionOverwriteMode` doit être définie sur `dynamic`. Le paramètre par défaut est `static`.
**Note**  
L'option d'écriture `partitionOverwriteMode` a été introduite dans Spark 2.4.0. Pour Spark version 2.3.2, inclus avec Amazon EMR version 5.19.0, définissez la propriété `spark.sql.sources.partitionOverwriteMode`. 
  + Si les tâches Spark remplacent la table Parquet du métastore Hive, `spark.sql.hive.convertMetastoreParquet`, `spark.sql.hive.convertInsertingPartitionedTable` et `spark.sql.hive.convertMetastore.partitionOverwriteMode` doivent être définis sur `true`. Il s'agit des paramètres par défaut. 
  + Si les tâches Spark remplacent la table ORC du métastore Hive, `spark.sql.hive.convertMetastoreOrc`, `spark.sql.hive.convertInsertingPartitionedTable` et `spark.sql.hive.convertMetastore.partitionOverwriteMode` doivent être définis sur `true`. Il s'agit des paramètres par défaut.

**Example – Mode de remplacement de partition dynamique**  
Dans cet exemple de Scala, l'optimisation est déclenchée. Tout d'abord, vous définissez la propriété `partitionOverwriteMode` sur `dynamic`. Cela remplace uniquement les partitions sur lesquelles vous écrivez des données. Ensuite, vous spécifiez les colonnes de partition dynamique avec `partitionBy` et définissez le mode d'écriture sur `overwrite`.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## Lorsque le protocole de validation EMRFS optimisé pour S3 n'est pas utilisé
<a name="emr-spark-commit-protocol-reqs-anti"></a>

En général, le protocole de validation optimisé pour EMRFS S3 fonctionne de la même manière que le protocole de validation Spark par défaut open source. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` L'optimisation ne se produira pas dans les situations suivantes.


****  

| Situation | Pourquoi le protocole de validation n'est pas utilisé | 
| --- | --- | 
| Lorsque vous écrivez dans HDFS | Le protocole de validation prend uniquement en charge l'écriture sur Amazon S3 à l'aide d'EMRFS. | 
| Lorsque vous utilisez le système de fichiers S3A | Le protocole de validation ne prend en charge que le protocole EMRFS. | 
| Lorsque vous utilisez l' MapReduce API RDD de Spark | Le protocole de validation prend uniquement en charge l'utilisation de SparkSQL ou DataFrame Dataset. APIs | 
| Lorsque le remplacement dynamique de la partition n'est pas déclenché | Le protocole de validation optimise uniquement les cas de remplacement dynamique de partitions. Pour les autres cas, consultez [Utilisation d'un valideur EMRFS optimisé pour S3](emr-spark-s3-optimized-committer.md). | 

Les exemples Scala suivants démontrent quelques situations supplémentaires que le protocole de validation EMRFS optimisé pour S3 délègue à `SQLHadoopMapReduceCommitProtocol`.

**Example – Mode de remplacement dynamique de partition avec emplacement de partition personnalisé**  
Dans cet exemple, les programmes Scala remplacent deux partitions en mode de remplacement dynamique de partition. Une partition possède un emplacement de partition personnalisé. L'autre partition utilise l'emplacement de partition par défaut. Le protocole de validation EMRFS optimisé pour S3 n'améliore que la partition qui utilise l'emplacement de partition par défaut.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Le code Scala crée les objets Amazon S3 suivants :  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
L'écriture sur des emplacements de partition personnalisés dans les versions antérieures de Spark peut entraîner une perte de données. Dans cet exemple, la partition `dt='2019-01-28'` serait perdue. Pour plus de détails, consultez [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106). Ce problème est résolu dans les versions 5.33.0 et ultérieures d'Amazon EMR, à l'exception des versions 6.0.x et 6.1.x.

Lorsque vous écrivez des partitions dans des emplacements personnalisés, Spark utilise un algorithme de validation similaire à celui de l'exemple précédent, qui est décrit ci-dessous. Comme dans l'exemple précédent, l'algorithme se traduit par des attributions séquentielles de nouveaux noms, ce qui peut avoir un impact négatif sur les performances.

L'algorithme dans Spark 2.4.0 exécute les étapes suivantes :

1. Lors de l'écriture de la sortie d'une partition dans un emplacement personnalisé, les tâches écrivent un fichier dans le répertoire intermédiaire de Spark, qui est créé sous l'emplacement de sortie final. Le nom du fichier comprend un UUID aléatoire pour éviter les conflits de fichier. La tentative de tâche suit chaque fichier, ainsi que le chemin de la sortie souhaité final.

1. Lorsqu'une tâche se termine avec succès, elle fournit au pilote les fichiers et les chemins de sortie souhaités finaux.

1. Une fois toutes les tâches terminées, la phase de validation de tâche renomme de manière séquentielle tous les fichiers qui ont été écrits pour les partitions dans les emplacements personnalisés en leurs chemins de sortie finaux.

1. Le répertoire intermédiaire est supprimé avant que la phase de validation de tâche soit terminée.

# Le protocole de validation EMRFS optimisé pour S3 et les chargements partitionnés
<a name="emr-spark-commit-protocol-multipart"></a>

Pour utiliser l'optimisation pour le remplacement dynamique des partitions dans le protocole de validation optimisé pour EMRFS S3, les chargements partitionnés doivent être activés dans Amazon EMR. Les chargements partitionnés sont activés par défaut. Vous pouvez les réactiver si besoin est. Pour plus d'informations, consultez [Configuration d'un chargement partitionné pour Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart) dans le *Guide de gestion Amazon EMR*. 

Pendant l'écrasement dynamique de partition, le protocole de validation EMRFS optimisé pour S3 utilise les caractéristiques de transaction des chargements partitionnés pour s'assurer que les fichiers écrits par les tentatives de tâches n'apparaissent que dans l'emplacement de sortie de la tâche au moment de la validation de la tâche. En utilisant les chargements partitionnés de cette manière, le protocole de validation améliore les performances de validation des tâches par rapport au protocole par défaut `SQLHadoopMapReduceCommitProtocol`. Lors de l'utilisation du protocole de validation EMRFS optimisé pour S3, il y a quelques différences clés à prendre en compte par rapport au comportement traditionnel des chargements partitionnés :
+ Les chargements partitionnés sont toujours effectués, peu importe la taille du fichier. Ceci diffère du comportement normal d'EMRFS, où la propriété `fs.s3n.multipart.uploads.split.size` contrôle la taille du fichier à laquelle les chargements partitionnés sont enclenchés.
+ Les chargements partitionnés sont laissés en état non terminé pendant un plus long laps de temps jusqu'à ce que la tâche soit validée ou abandonnée. Ceci diffère du comportement normal d'EMRFS, où un chargement partitionné se termine lorsqu'un tâche termine d'écrire un fichier donné.

En raison de ces différences, si une JVM Spark Executor se bloque ou est mise hors service alors que des tâches sont en cours d'exécution et écrivent des données sur Amazon S3, ou si une JVM Spark Driver se bloque ou est mise hors service alors qu'une tâche est en cours d'exécution, les chargements partitionnés incomplets sont plus susceptibles d'être laissés derrière. Pour cette raison, lorsque vous utilisez le protocole de validation EMRFS optimisé pour S3, assurez-vous de suivre les bonnes pratiques de gestion de chargements partitionnés échoués. Pour plus d'informations, consultez [Bonnes pratiques](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices) pour travailler avec des compartiments Amazon S3 dans le *Guide de gestion Amazon EMR*.

# Considérations de réglage de tâche
<a name="emr-spark-commit-protocol-tuning"></a>

Sur les exécuteurs Spark, le protocole de validation EMRFS optimisé pour S3 consomme une petite quantité de mémoire pour chaque fichier écrit par une tentative de tâche, jusqu'à ce que la tâche soit validée ou abandonnée. La quantité de mémoire consommée est négligeable dans la plupart des tâches. 

Sur les pilotes Spark, le protocole de validation optimisé pour EMRFS S3 nécessite de la mémoire pour stocker les informations de métadonnées de chaque fichier validé jusqu'à ce que la tâche soit validée ou abandonnée. Dans la plupart des tâches, le paramètre de mémoire par défaut du pilote Spark est négligeable. 

Pour les tâches qui ont des tâches de longue durée qui écrivent un grand nombre de fichiers, la mémoire que le protocole de validation consomme peut être perceptible et nécessiter des ajustements de la mémoire allouée pour Spark, en particulier pour les exécuteurs Spark. Vous pouvez régler la mémoire à l'aide de la propriété `spark.driver.memory` pour les pilotes Spark et de la propriété `spark.executor.memory` pour les exécuteurs Spark. Une seule tâche écrivant 100 000 fichiers nécessiterait en général environ 100 Mo de mémoire supplémentaire. Pour plus d'informations, consultez [Propriétés d'applications](https://spark.apache.org/docs/latest/configuration.html#application-properties) dans la Documentation de configuration Apache Spark.