

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# EMRFS-S3-optimierte Committer verwenden
<a name="emr-spark-s3-optimized-committer"></a>

Der EMRFS S3-optimierte Committer ist eine alternative [OutputCommitter](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html)Implementierung, die für das Schreiben von Dateien in Amazon S3 bei Verwendung von EMRFS optimiert ist. Der für EMRFS S3 optimierte Comitter verbessert die Anwendungsleistung durch Umgehung der Operationen zum Auflisten und Umbenennen, die während der Commit-Phasen von Aufträgen und Aufgaben in Amazon S3 durchgeführt werden. Die Committer-Klasse ist verfügbar für Amazon-EMR-Version 5.19.0 und höher und ist bei Amazon EMR 5.20.0 und höher standardmäßig aktiviert. Der Committer wird für Spark-Jobs verwendet, die Spark oder Datasets verwenden. DataFrames Ab Amazon EMR 6.4.0 kann dieser Committer für alle gängigen Formate verwendet werden, einschließlich Parquet, ORC und textbasierte Formate (einschließlich CSV und JSON). Für Versionen vor Amazon EMR 6.4.0 wird nur das Parquet-Format unterstützt. Es gibt Situationen, in denen der Committer nicht verwendet wird. Weitere Informationen finden Sie unter [Anforderungen für den S3-optimierten EMRFS-Committer](emr-spark-committer-reqs.md).

**Topics**
+ [Anforderungen für den S3-optimierten EMRFS-Committer](emr-spark-committer-reqs.md)
+ [Der S3-optimierte EMRFS-Committer und mehrteilige Uploads](emr-spark-committer-multipart.md)
+ [Überlegungen zur Auftragsoptimierung](emr-spark-committer-tuning.md)
+ [Aktivieren Sie den S3-optimierten Committer für Amazon EMR 5.19.0](emr-spark-committer-enable.md)

# Anforderungen für den S3-optimierten EMRFS-Committer
<a name="emr-spark-committer-reqs"></a>

Der S3-optimierte EMRFS-Committer wird verwendet, wenn die folgenden Bedingungen erfüllt sind:
+ Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden DataFrames, um Dateien in Amazon S3 zu schreiben. Ab Amazon EMR 6.4.0 kann dieser Committer für alle gängigen Formate verwendet werden, einschließlich Parquet, ORC und textbasierte Formate (einschließlich CSV und JSON). Für Versionen vor Amazon EMR 6.4.0 wird nur das Parquet-Format unterstützt.
+ In Amazon EMR sind mehrteilige Uploads aktiviert. Das ist die Standardeinstellung. Weitere Informationen finden Sie unter [Der S3-optimierte EMRFS-Committer und mehrteilige Uploads](emr-spark-committer-multipart.md). 
+ Die integrierte Dateiformatunterstützung von Spark wird verwendet. Die integrierte Dateiformatunterstützung wird unter folgenden Umständen verwendet:
  + Bei Hive-Metastore-Tabellen, wenn `spark.sql.hive.convertMetastoreParquet` für Parquet-Tabellen auf `true` oder `spark.sql.hive.convertMetastoreOrc` für Orc-Tabellen mit Amazon EMR 6.4.0 oder höher auf `true` eingestellt. Dies sind die Standardeinstellungen.
  + Wenn Aufträge in Datenquellen oder Tabellen im Dateiformat schreiben, wird beispielsweise die Zieltabelle mit der `USING parquet` Klausel erstellt. 
  + Wenn Aufträge in nicht partitionierte Hive-Metastore-Parquet-Tabellen schreiben. Eine bekannte Einschränkung besteht darin, dass die in Spark integrierte Parquet-Unterstützung keine partitionierten Hive-Tabellen unterstützt. Weitere Informationen finden Sie unter [Hive Metastore Parquet-Tabellenkonvertierung](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion) im Apache Spark DataFrames and Datasets Guide.
+ Spark-Job-Operationen, die beispielsweise `${table_location}/k1=v1/k2=v2/` in einen Standardspeicherort für Partitionen schreiben, verwenden den Committer. Der Committer wird nicht verwendet, wenn ein Job-Vorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, z. B. wenn mit dem `ALTER TABLE SQL`-Befehl ein benutzerdefinierter Partitionsspeicherort festgelegt wurde.
+ Für Spark müssen die folgenden Werte verwendet werden:
  + Der Eigenschaft `spark.sql.parquet.fs.optimized.committer.optimization-enabled` muss auf `true` eingestellt sein. Dies ist die Standardeinstellung bei Amazon EMR 5.20.0 und höher. Bei Amazon EMR 5.19.0 lautet der Standardwert `false`. Weitere Informationen zum Konfigurieren dieses Werts finden Sie unter [Aktivieren Sie den S3-optimierten Committer für Amazon EMR 5.19.0](emr-spark-committer-enable.md).
  + Beim Schreiben in nicht partitionierte Hive-Metastore-Tabellen werden nur die Dateiformate Parquet und Orc unterstützt. `spark.sql.hive.convertMetastoreParquet`muss auf gesetzt sein, `true` wenn in nicht partitionierte Parquet Hive-Metastore-Tabellen geschrieben werden. `spark.sql.hive.convertMetastoreOrc`muss auf gesetzt sein, `true` wenn in nicht partitionierte Orc Hive Metastore-Tabellen geschrieben werden. Dies sind die Standardeinstellungen.
  + muss `spark.sql.parquet.output.committer.class` auf `com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter` festgelegt sein. Dies ist die Standardeinstellung.
  + `spark.sql.sources.commitProtocolClass` muss auf `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` oder `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` eingestellt sein. `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` ist die Standardeinstellung für die Amazon-EMR-5.x-Serie, Version 5.30.0 und höher, und für die Amazon-EMR-6.x-Serie, Version 6.2.0 und höher. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` ist die Standardeinstellung für frühere Amazon-EMR-Versionen.
  + Wenn Spark-Aufträge partitionierte Parquet-Datasets durch dynamische Partitionsspalten überschreiben, dann müssen die Schreiboption `partitionOverwriteMode` und `spark.sql.sources.partitionOverwriteMode` auf `static` eingestellt sein. Dies ist die Standardeinstellung.
**Anmerkung**  
Die Schreiboption `partitionOverwriteMode` wurde mit Spark 2.4.0 eingeführt. Legen Sie für Spark-Version 2.3.2, die in Amazon-EMR-Version 5.19.0 enthalten ist, die Eigenschaft `spark.sql.sources.partitionOverwriteMode` fest. 

## Fälle, in denen der S3-optimierte EMRFS-Committer nicht verwendet wird
<a name="emr-spark-committer-reqs-anti"></a>

Im Allgemeinen wird der S3-optimierte Committer in den folgenden Situationen nicht verwendet.


****  

| Situation | Warum der Committer nicht verwendet wird | 
| --- | --- | 
| Wenn Sie in HDFS schreiben | Der Committer unterstützt nur das Schreiben in Amazon S3 mit EMRFS. | 
| Bei Verwendung des S3A-Dateisystems | Der Committer unterstützt nur EMRFS. | 
| Wenn Sie die RDD-API von Spark verwenden MapReduce  | Der Committer unterstützt nur die Verwendung von SparkSQL oder Dataset DataFrame. APIs | 

Die folgenden Scala-Beispiele veranschaulichen einige zusätzliche Situationen, die verhindern, dass der EMRFS-S3-optimierte Committer vollständig (erstes Beispiel) oder teilweise (zweites Beispiel) verwendet werden kann.

**Example – Dynamischer Partitionsüberschreibmodus**  
Das folgende Scala-Beispiel weist Spark an, einen anderen Commit-Algorithmus zu verwenden, wodurch die Verwendung des für EMRFS S3 optimierten Committers insgesamt verhindert wird. Der Code legt die Eigenschaft `partitionOverwriteMode` auf `dynamic` fest, sodass nur die Partitionen überschrieben werden, auf die Sie Daten schreiben. Anschließend werden dynamische Partitionsspalten durch `partitionBy` angegeben und der Schreibmodus ist auf `overwrite` eingestellt.   

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .partitionBy("dt")
  .parquet("s3://amzn-s3-demo-bucket1/output")
```
Sie müssen alle drei Einstellungen konfigurieren, um die Verwendung des S3-optimierten Committers zu vermeiden. Wenn Sie dies tun, führt Spark einen anderen Commit-Algorithmus aus, der im Commit-Protokoll von Spark angegeben ist. Für Amazon-EMR-5.x-Versionen vor 5.30.0 und für Amazon-EMR-6.x-Versionen vor 6.2.0 verwendet das Commit-Protokoll das Staging-Verzeichnis von Spark, ein temporäres Verzeichnis, das unter dem Ausgabespeicherort erstellt wird, der mit `.spark-staging` beginnt. Der Algorithmus benennt Partitionsverzeichnisse nacheinander um, was sich negativ auf die Leistung auswirken kann. Weitere Informationen zu den Amazon-EMR-Versionen 5.30.0 und höher sowie 6.2.0 und höher finden Sie unter [Verwenden Sie das EMRFS-S3-optimierte Commit-Protokoll](emr-spark-s3-optimized-commit-protocol.md).   
Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:  

1. Aufgabenversuche schreiben ihre Ausgabe in Partitionsverzeichnisse unterhalb des Staging-Verzeichnisses von Spark, beispielsweise `${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/`.

1. Für jede geschriebene Partition verfolgt der Aufgabenversuch die relativen Partitionspfade, z. B. `k1=v1/k2=v2`.

1. Wenn eine Aufgabe erfolgreich abgeschlossen wurde, stellt sie dem Treiber alle zugehörigen Partitionspfade bereit, die von ihr nachverfolgt wurden.

1. Nachdem alle Aufgaben abgeschlossen wurden, sammelt die Auftrags-Commit-Phase alle die Partitionsverzeichnisse, die von erfolgreichen Aufgabenversuchen unter dem Staging-Verzeichnis von Spark geschrieben wurden. Spark benennt jedes dieser Verzeichnisse mithilfe von Verzeichnisstruktur-Umbenennungsoperationen sequenziell bis zu ihrem endgültigen Ausgabespeicherort um.

1. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.

**Example – Benutzerdefinierter Partitionsspeicherort**  
In diesem Beispiel wird der Scala-Code in zwei Partitionen eingefügt. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Der S3-optimierte EMRFS-Committer wird nur zum Schreiben der Aufgabenausgabe in die Partition genutzt, die den Standard-Partitionsspeicherort verwendet.  

```
val table = "dataset"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")
                            
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")
                            
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
                            
def asDate(text: String) = lit(text).cast("date")
                            
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .write.insertInto(table)
```
Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.  

1. Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält eine zufällige UUID zum Schutz vor Datei-Kollisionen. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.

1. Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.

1. Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.

1. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.

# Der S3-optimierte EMRFS-Committer und mehrteilige Uploads
<a name="emr-spark-committer-multipart"></a>

Um den S3-optimierten EMRFS-Committer zu verwenden, müssen Sie mehrteilige Uploads in Amazon EMR aktivieren. Mehrteilige Uploads sind standardmäßig aktiviert. Sie können diese Option bei Bedarf erneut aktivieren. Weitere Informationen finden Sie unter [Konfigurieren von mehrteiligen Uploads für Amazon S3](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart) im *Verwaltungshandbuch für Amazon EMR*. 

Der S3-optimierte EMRFS-Committer verwendet die transaktionsähnlichen Merkmale von mehrteiligen Uploads, um sicherzustellen, dass Dateien, die beim Versuch, Aufgaben auszuführen geschrieben werden, nur am Ausgabespeicherort des Auftrags angezeigt werden. Durch die Verwendung von mehrteiligen Uploads auf diese Weise verbessert der Committer die Leistung des Task-Commits im Vergleich zur Standardalgorithmusversion 2. FileOutputCommitter Wenn Sie den S3-optimierten EMRFS-Committer verwenden, gilt es einige wichtige Unterschiede zu dem herkömmlichen Verhalten bei mehrteiligen Uploads zu berücksichtigen:
+ Mehrteilige Uploads werden immer ausgeführt, unabhängig von der Dateigröße. Dies unterscheidet sich vom Standardverhalten von EMRFS, bei dem die Eigenschaft `fs.s3n.multipart.uploads.split.size` die Dateigröße steuert, in der mehrteilige Uploads ausgelöst werden.
+ Mehrteilige Uploads verbleiben für einen längeren Zeitraum in einem Status, in dem sie nicht abgeschlossen sind, bis die Aufgabe übertragen oder abgebrochen wird. Dies unterscheidet sich von der Standard-Verhalten von EMRFS. Dort wird ein mehrteiliger Upload abgeschlossen, wenn eine Aufgabe den Schreibvorgang für eine bestimmte Datei beendet hat.

Aufgrund dieser Unterschiede vergrößert sich bei mehrteiligen Uploads die Wahrscheinlichkeit, dass unvollständige mehrteilige Uploads zurückbleiben, wenn ein Spark Executor JVM abstürzt oder zerstört wird, während Aufgaben ausgeführt oder Daten auf Amazon S3 geschrieben werden. Aus diesem Grund sollten Sie bei Verwendung des S3-optimierten EMRFS-Committer darauf achten, den bewährten Methoden für die Verwaltung von fehlgeschlagenen mehrteiligen Uploads zu folgen. Weitere Informationen finden Sie unter [Bewährte Methoden](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices) für die Arbeit mit Amazon-S3-Buckets im *Verwaltungshandbuch für Amazon EMR*.

# Überlegungen zur Auftragsoptimierung
<a name="emr-spark-committer-tuning"></a>

Der S3-optimierte EMRFS-Committer verbraucht eine geringe Speichermenge für jede Datei, die versuchsweise von einer Aufgabe geschrieben wird, bis die Aufgabe übermittelt oder abgebrochen wird. Bei den meisten Aufträgen ist die Menge des belegten Speichers vernachlässigbar. Bei Aufträgen mit Aufgaben mit langer Ausführungsdauer, die eine große Anzahl von Dateien schreiben, macht sich der Arbeitsspeicher, den der Committer benötigt, bemerkbar, und dies erfordert möglicherweise Anpassungen an den für Spark Executor zugeteilten Arbeitsspeicher. Sie können die Eigenschaft `spark.executor.memory` verwenden, um den Executor-Arbeitsspeicher anzupassen. Als Faustregel gilt: für jeweils 100.000 Dateien, die eine einzelne Aufgabe schreib, werden in der Regel zusätzlich 100 MB Arbeitsspeicher benötigt. Weiter Informationen finden Sie unter [Anwendungseigenschaften](https://spark.apache.org/docs/latest/configuration.html#application-properties) in der Dokumentation zur Konfiguration von Apache Spark.

# Aktivieren Sie den S3-optimierten Committer für Amazon EMR 5.19.0
<a name="emr-spark-committer-enable"></a>

Wenn Sie Amazon EMR 5.19.0 verwenden, können Sie die Eigenschaft `spark.sql.parquet.fs.optimized.committer.optimization-enabled` manuell auf `true` einstellen, wenn Sie einen Cluster erstellen, oder aus Spark heraus, wenn Sie Amazon EMR verwenden.

## Aktivieren des S3-optimierten EMRFS-Committers beim Erstellen eines Clusters
<a name="w2aac62c61c17c13b5"></a>

Verwenden Sie die Konfigurationsklassifizierung `spark-defaults`, um die `spark.sql.parquet.fs.optimized.committer.optimization-enabled`-Eigenschaften auf `true` festzulegen. Weitere Informationen finden Sie unter [Anwendungen konfigurieren](emr-configure-apps.md).

## Aktivieren des S3-optimierten EMRFS-Committers über Spark
<a name="w2aac62c61c17c13b7"></a>

Sie können `spark.sql.parquet.fs.optimized.committer.optimization-enabled` auf `true` festlegen, indem Sie dies in einer `SparkConf` fest kodieren, als `--conf`-Parameter in der Spark-Shell übergeben, die Tools `spark-submit` bzw. `spark-sql` verwenden oder dies in `conf/spark-defaults.conf` angeben. Weiter Informationen finden Sie unter [Spark-Konfiguration](https://spark.apache.org/docs/latest/configuration.html) in der Apache-Spark-Dokumentation.

Im folgenden Beispiel wird gezeigt, wie Sie den Committer während der Ausführung eines spark-sql-Befehls aktivieren.

```
spark-sql \
  --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true \
  -e "INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;"
```