

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verbessern der Spark Leistung mit Amazon S3
<a name="emr-spark-s3-performance"></a>

Amazon EMR bietet Features, mit denen die Leistung optimiert werden kann, wenn Spark für Abfragen und Lese- bzw. Schreiboperationen über Daten in Amazon S3 verwendet wird.

[S3 Select](https://aws.amazon.com/blogs/aws/s3-glacier-select/) kann in einigen Anwendungen die Abfrageleistung bei CSV- und JSON-Dateien verbessern, indem die Verarbeitung an Amazon S3 ausgelagert wird.

Der EMRFS S3-optimierte Committer ist eine Alternative zur [OutputCommitter](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html)Klasse, die die Funktion für mehrteilige Uploads von EMRFS verwendet, um die Leistung beim Schreiben von Parquet-Dateien auf Amazon S3 mithilfe von Spark, und Datasets zu verbessern. DataFrames

**Topics**
+ [

# S3 Select mit Spark zur Verbesserung der Leistung bei Abfragen verwenden
](emr-spark-s3select.md)
+ [

# EMR Spark MagicCommitProtocol
](emr-spark-magic-commit-protocol.md)
+ [

# EMRFS-S3-optimierte Committer verwenden
](emr-spark-s3-optimized-committer.md)
+ [

# Verwenden Sie das EMRFS-S3-optimierte Commit-Protokoll
](emr-spark-s3-optimized-commit-protocol.md)
+ [

# Amazon-S3-Anforderungen mit EMRFS wiederholen
](emr-spark-emrfs-retry.md)

# S3 Select mit Spark zur Verbesserung der Leistung bei Abfragen verwenden
<a name="emr-spark-s3select"></a>

**Wichtig**  
Amazon S3 Select ist für Neukunden nicht mehr verfügbar. Bestandskunden von Amazon S3 Select können das Feature weiterhin wie gewohnt nutzen. [Weitere Informationen](https://aws.amazon.com/blogs/storage/how-to-optimize-querying-your-data-in-amazon-s3/) 

Mit Amazon-EMR-Version 5.17.0 und höher können Sie [S3 Select](https://aws.amazon.com/blogs/aws/s3-glacier-select/) mit Spark auf Amazon EMR verwenden. *S3 Select* ermöglicht es Anwendungen, nur eine Teilmenge von Daten aus einem Objekt abzurufen. Bei Amazon EMR wird die numerische Arbeit zur Filterung großer Datensätze von dem Cluster an Amazon S3 ausgelagert. Dies kann die Leistung in einigen Anwendungen verbessern und reduziert die Menge der zwischen Amazon EMR und Amazon S3 übertragenen Daten.

S3 Select unterstützt CSV und JSON-Dateien, das Datenformat wird unter Verwendung von `s3selectCSV` und `s3selectJSON` angegeben. Weitere Informationen und Beispiele finden Sie unter [S3 Select in Ihrem Code angeben](#emr-spark-s3select-specify).

## Ist S3 Select das Richtige für meine Anwendung?
<a name="emr-spark-s3select-apps"></a>

Wir empfehlen, dass Sie Benchmark-Tests für Ihre Anwendungen im Vergleich mit und ohne S3 Select durchführen, um zu sehen, ob es für Ihre Anwendung geeignet sein könnte.

Verwenden Sie die folgenden Richtlinien, um zu bestimmen, ob Ihre Anwendung ein Kandidat für die Verwendung von S3 ist:
+ Ihre Abfrage filtert mehr als die Hälfte des ursprünglichen Datensatzes.
+ Ihre Netzwerkverbindung zwischen Amazon S3 und dem Amazon-EMR-Cluster hat eine gute Übertragungsgeschwindigkeit und verfügbare Bandbreite. Amazon S3 komprimiert keine HTTP-Antworten, sodass die Antwortgröße bei komprimierten Eingabedateien wahrscheinlich zunimmt.

## Überlegungen und Einschränkungen
<a name="emr-spark-s3select-considerations"></a>
+ Die Amazon-S3-serverseitige Verschlüsselung mit vom Kunden bereitgestellten Verschlüsselungsschlüsseln (SSE-C) und die clientseitige Verschlüsselung werden nicht unterstützt. 
+ Die Eigenschaft `AllowQuotedRecordDelimiters` wird nicht unterstützt. Wenn diese Eigenschaft angegeben ist, schlägt die Abfrage fehl.
+ Nur CSV- und JSON-Dateien im UTF-8-Format werden unterstützt. Mehrzeilige Zeilen werden nicht unterstützt. CSVs
+ Es werden nur unkomprimierte oder gzip-Dateien unterstützt.
+ Die Optionen für die Datenformate CSV und JSON in Spark (z. B. `nanValue`, `positiveInf`, `negativeInf`) sowie Optionen zum Umgang mit beschädigten Datensätzen (z. B. der failfast-Modus zur frühen Terminierung und der dropmalformed-Modus zum Überspringen von Datensätzen im falschen Format) werden nicht unterstützt.
+ Die Verwendung von Kommas als Tausender-Trennzeichen wird bei Dezimalzahlen nicht unterstützt. Beispielweise ist `10,000` ungültig, `10000` wird unterstützt.
+ Kommentarzeichen auf der letzten Zeile werden nicht unterstützt.
+ Leere Zeilen am Ende einer Datei werden nicht verarbeitet.
+ Die folgenden Filter werden nicht an Amazon S3 ausgelagert:
  + Aggregatfunktionen wie `COUNT()` und `SUM()`.
  + Filter, die ein Attribut über einen `CAST()`-Aufruf konvertieren. Beispiel, `CAST(stringColumn as INT) = 1`.
  + Filter mit einem Attribut, das ein Objekt ist oder einem zusammengesetzten Attribut. Beispiel, `intArray[1] = 1, objectColumn.objectNumber = 1`.
  + Filter, deren Wert ist nicht literal ist. Beispiel: `intColumn1 = intColumn2`
  + Es werden nur die [von S3 Select unterstützten Datentypen](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-data-types.html) unterstützt, und es gelten die dokumentierten Einschränkungen.

## S3 Select in Ihrem Code angeben
<a name="emr-spark-s3select-specify"></a>

Die folgenden Beispiele zeigen, wie S3 Select für CSV mit Scala, SQL, R und PySpark angegeben wird. Auf die gleiche Weise können Sie mit S3 Select JSON als Datenformat festlegen. Eine Liste der Optionen, der Standardwerte und der Einschränkungen finden Sie unter [Optionen](#emr-spark-s3select-specify-options).

------
#### [ PySpark ]

```
spark
  .read
  .format("s3selectCSV") // "s3selectJson" for Json
  .schema(...) // optional, but recommended
  .options(...) // optional
  .load("s3://path/to/my/datafiles")
```

------
#### [ R ]

```
read.df("s3://path/to/my/datafiles", "s3selectCSV", schema, header = "true", delimiter = "\t")
```

------
#### [ Scala ]

```
spark
  .read
  .format("s3selectCSV") // "s3selectJson" for Json
  .schema(...) // optional, but recommended
  .options(...) // optional. Examples:  
  // .options(Map("quote" -> "\'", "header" -> "true")) or
  // .option("quote", "\'").option("header", "true")
  .load("s3://path/to/my/datafiles")
```

------
#### [ SQL ]

```
CREATE TEMPORARY VIEW MyView (number INT, name STRING) USING s3selectCSV OPTIONS (path "s3://path/to/my/datafiles", header "true", delimiter "\t")
```

------

### Optionen
<a name="emr-spark-s3select-specify-options"></a>

Die folgenden Optionen sind verfügbar, wenn `s3selectCSV` und `s3selectJSON` verwendet wird. Wenn nichts angegeben wird, werden die Standardwerte verwendet.

#### Optionen für S3selectCSV
<a name="emr-spark-s3select-specify-options-csv"></a>


| Option | Standard | Usage | 
| --- | --- | --- | 
|  `compression`  |  `"none"`  |  Gibt an, ob Komprimierung verwendet wird. `"gzip"` wird neben `"none"` als einzige Einstellung unterstützt.  | 
|  `delimiter`  |  ","  |  Gibt den Feldbegrenzer an.  | 
|  `quote`  |  `'\"'`  |  Gibt das Zeichen an, das als Anführungszeichen verwendet wird. Die Angabe einer leeren Zeichenfolge wird nicht unterstützt und führt zu einem XML-Fehler aufgrund fehlerhaftem Format.  | 
|  `escape`  |  `'\\'`  |  Gibt das Zeichen an, das als Escape-Zeichen verwendet wird.  | 
|  `header`  |  `"false"`  |  `"false"` gibt an, dass es keine Kopfzeile gibt. `"true"` gibt an, dass die erste Zeile die Kopfzeile ist. Es werden ausschließlich Kopfzeilen in der ersten Zeile unterstützt, und leere Zeilen vor der Kopfzeile werden nicht unterstützt.  | 
|  Kommentar  |  `"#"`  |  Gibt das Zeichen an, das als Kommentarzeichen verwendet wird. Der Kommentarindikator kann nicht deaktiviert werden. Dies bedeutet, dass der Wert `\u0000` nicht unterstützt wird.  | 
|  `nullValue`  |  ""  |    | 

#### Optionen für S3selectJSON
<a name="emr-spark-s3select-specify-options-json"></a>


| Option | Standard | Usage | 
| --- | --- | --- | 
|  `compression`  |  `"none"`  |  Gibt an, ob Komprimierung verwendet wird. `"gzip"` wird neben `"none"` als einzige Einstellung unterstützt.  | 
|  `multiline`  |  „false“  |  `"false"` gibt an, dass die JSON-Daten in S3 Select im `LINES`-Format vorliegt. Dies bedeutet, dass jede Zeile in den Eingabedaten genau ein JSON-Objekt enthält. `"true"` gibt an, dass die JSON-Daten in S3 Select im `DOCUMENT`-Format vorliegen. Dies bedeutet, dass sich ein JSON-Objekt in den Eingabedaten über mehrere Zeilen erstrecken kann.  | 

# EMR Spark MagicCommitProtocol
<a name="emr-spark-magic-commit-protocol"></a>

Ab EMR 6.15.0 MagicCommitProtocol wird es zur Standardeinstellung FileCommitProtocol für Spark, wenn das S3A-Dateisystem verwendet wird.

## MagicCommitProtocol
<a name="magic-commit-protocol"></a>

Dies MagicCommitProtocol ist eine alternative Implementierung von, die für [FileCommitProtocol](https://dlcdn.apache.org/spark/docs/2.4.2/api/java/org/apache/spark/internal/io/FileCommitProtocol.html)das Schreiben von Dateien mit EMR Spark auf Amazon S3 optimiert ist, wenn das S3A-Dateisystem verwendet wird. Dieses Protokoll zielt darauf ab, die Anwendungsleistung zu verbessern, indem die Verwendung von Umbenennungsvorgängen in Amazon S3 während der Job- und Task-Commit-Phasen vermieden wird.

Dies MagicCommitProtocol ist die FileCommitProtocol Standardimplementierung, die von Spark auf Amazon Elastic Map Reduce (EMR) verwendet wird, wenn das S3A-Dateisystem verwendet wird. Der verwendet MagicCommitProtocol intern den [MagicV2Committer, um die Dateischreibvorgänge](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/s3a-magicv2-committer.html) auf Amazon S3 durchzuführen.

Bei statischen Einfügevorgängen MagicCommitProtocol schreibt der die Dateien während der Commit-Phase der Aufgabe in den Ausgabespeicherort des Jobs. Im Gegensatz dazu erscheinen bei dynamischen Einfüge-Überschreibvorgängen die Dateien, die bei Versuchen mit der Aufgabe geschrieben wurden, erst nach dem Commit des Jobs am Ausgabespeicherort des Jobs. Dies wird erreicht, indem die Commit-Metadaten beim Commit-Aufruf der Aufgabe zurück in den Spark-Treiber exportiert werden.

## Aktivieren MagicCommitProtocol
<a name="enabling-magic-commit-protocol"></a>

Das MagicCommitProtocol ist standardmäßig aktiviert, wenn Spark auf Amazon Elastic Map Reduce (EMR) läuft, wenn das S3A-Dateisystem verwendet wird.

Um das S3A-Dateisystem zu verwenden, können Sie entweder:

1. Verwenden Sie das Dateischema wie `s3a://` bei der Definition der Tabelle, Partition oder des Verzeichnisses.

1. Stellen Sie die Konfiguration `fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem` in core-site.xml ein.

## Deaktivierung der MagicCommitProtocol
<a name="disabling-magic-commit-protocol"></a>

1. Sie können `spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.leverageMagicCommitProtocol` den Wert auf false setzen`SparkConf`, indem Sie ihn in a fest codieren `spark-submit` und ihn als `--conf` Parameter in der Spark-Shell oder in `spark-sql` Tools oder in übergeben. `conf/spark-defaults.conf` Weitere Informationen finden Sie unter [Spark-Konfiguration](https://spark.apache.org/docs/latest/configuration.html) in der Apache Spark-Dokumentation.

   Das folgende Beispiel zeigt, wie ein `spark-sql` Befehl deaktiviert wird MagicCommitProtocol , während er ausgeführt wird.

   ```
   spark-sql \
     --conf spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.leverageMagicCommitProtocol=false \
   -e "INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;"
   ```

1. Verwenden Sie die `spark-defaults` Konfigurationsklassifizierung, um die `spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.leverageMagicCommitProtocol` Eigenschaft auf False zu setzen. Weitere Informationen finden Sie unter [Konfigurieren von Anwendungen](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html).

## MagicCommitProtocol Überlegungen
<a name="magic-commit-considerations"></a>
+ Beim Einfügen statischer Partitionen MagicCommitProtocol beansprucht der auf Spark-Executoren eine geringe Menge an Speicher für jede Datei, die durch einen Task-Versuch geschrieben wurde, bis die Aufgabe festgeschrieben oder abgebrochen wird. Bei den meisten Aufträgen ist die Menge des belegten Speichers vernachlässigbar. Für den Spark-Treiber ist kein zusätzlicher Speicherbedarf erforderlich
+ Beim dynamischen Einfügen von Partitionen MagicCommitProtocol benötigt der auf Spark-Treibern Speicherplatz, um die Metadateninformationen jeder übergebenen Datei zu speichern, bis der Job festgeschrieben oder abgebrochen wird. Bei den meisten Aufträgen ist die standardmäßige Speichereinstellung des Spark-Treibers vernachlässigbar.

  Bei Aufträgen mit Aufgaben mit langer Laufzeit, die eine große Anzahl von Dateien schreiben, kann der Speicherverbrauch des Commit-Protokolls spürbar sein und Anpassungen des für Spark zugewiesenen Speichers erfordern, insbesondere für Spark-Ausführer. Sie können den Speicher mithilfe der `spark.driver.memory`-Eigenschaft für Spark-Treiber und der Eigenschaft für `spark.executor.memory`-Spark-Aufträge optimieren. Als Richtlinie gilt, dass für eine einzelne Aufgabe, die 100.000 Dateien schreibt, in der Regel zusätzliche 200 MB Arbeitsspeicher benötigt werden. Weiter Informationen finden Sie unter [Anwendungseigenschaften](https://spark.apache.org/docs/latest/configuration.html#application-properties) in der Dokumentation zur Konfiguration von Apache Spark.

# EMRFS-S3-optimierte Committer verwenden
<a name="emr-spark-s3-optimized-committer"></a>

Der EMRFS S3-optimierte Committer ist eine alternative [OutputCommitter](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html)Implementierung, die für das Schreiben von Dateien in Amazon S3 bei Verwendung von EMRFS optimiert ist. Der für EMRFS S3 optimierte Comitter verbessert die Anwendungsleistung durch Umgehung der Operationen zum Auflisten und Umbenennen, die während der Commit-Phasen von Aufträgen und Aufgaben in Amazon S3 durchgeführt werden. Die Committer-Klasse ist verfügbar für Amazon-EMR-Version 5.19.0 und höher und ist bei Amazon EMR 5.20.0 und höher standardmäßig aktiviert. Der Committer wird für Spark-Jobs verwendet, die Spark oder Datasets verwenden. DataFrames Ab Amazon EMR 6.4.0 kann dieser Committer für alle gängigen Formate verwendet werden, einschließlich Parquet, ORC und textbasierte Formate (einschließlich CSV und JSON). Für Versionen vor Amazon EMR 6.4.0 wird nur das Parquet-Format unterstützt. Es gibt Situationen, in denen der Committer nicht verwendet wird. Weitere Informationen finden Sie unter [Anforderungen für den S3-optimierten EMRFS-Committer](emr-spark-committer-reqs.md).

**Topics**
+ [

# Anforderungen für den S3-optimierten EMRFS-Committer
](emr-spark-committer-reqs.md)
+ [

# Der S3-optimierte EMRFS-Committer und mehrteilige Uploads
](emr-spark-committer-multipart.md)
+ [

# Überlegungen zur Auftragsoptimierung
](emr-spark-committer-tuning.md)
+ [

# Aktivieren Sie den S3-optimierten Committer für Amazon EMR 5.19.0
](emr-spark-committer-enable.md)

# Anforderungen für den S3-optimierten EMRFS-Committer
<a name="emr-spark-committer-reqs"></a>

Der S3-optimierte EMRFS-Committer wird verwendet, wenn die folgenden Bedingungen erfüllt sind:
+ Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden DataFrames, um Dateien in Amazon S3 zu schreiben. Ab Amazon EMR 6.4.0 kann dieser Committer für alle gängigen Formate verwendet werden, einschließlich Parquet, ORC und textbasierte Formate (einschließlich CSV und JSON). Für Versionen vor Amazon EMR 6.4.0 wird nur das Parquet-Format unterstützt.
+ In Amazon EMR sind mehrteilige Uploads aktiviert. Das ist die Standardeinstellung. Weitere Informationen finden Sie unter [Der S3-optimierte EMRFS-Committer und mehrteilige Uploads](emr-spark-committer-multipart.md). 
+ Die integrierte Dateiformatunterstützung von Spark wird verwendet. Die integrierte Dateiformatunterstützung wird unter folgenden Umständen verwendet:
  + Bei Hive-Metastore-Tabellen, wenn `spark.sql.hive.convertMetastoreParquet` für Parquet-Tabellen auf `true` oder `spark.sql.hive.convertMetastoreOrc` für Orc-Tabellen mit Amazon EMR 6.4.0 oder höher auf `true` eingestellt. Dies sind die Standardeinstellungen.
  + Wenn Aufträge in Datenquellen oder Tabellen im Dateiformat schreiben, wird beispielsweise die Zieltabelle mit der `USING parquet` Klausel erstellt. 
  + Wenn Aufträge in nicht partitionierte Hive-Metastore-Parquet-Tabellen schreiben. Eine bekannte Einschränkung besteht darin, dass die in Spark integrierte Parquet-Unterstützung keine partitionierten Hive-Tabellen unterstützt. Weitere Informationen finden Sie unter [Hive Metastore Parquet-Tabellenkonvertierung](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion) im Apache Spark DataFrames and Datasets Guide.
+ Spark-Job-Operationen, die beispielsweise `${table_location}/k1=v1/k2=v2/` in einen Standardspeicherort für Partitionen schreiben, verwenden den Committer. Der Committer wird nicht verwendet, wenn ein Job-Vorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, z. B. wenn mit dem `ALTER TABLE SQL`-Befehl ein benutzerdefinierter Partitionsspeicherort festgelegt wurde.
+ Für Spark müssen die folgenden Werte verwendet werden:
  + Der Eigenschaft `spark.sql.parquet.fs.optimized.committer.optimization-enabled` muss auf `true` eingestellt sein. Dies ist die Standardeinstellung bei Amazon EMR 5.20.0 und höher. Bei Amazon EMR 5.19.0 lautet der Standardwert `false`. Weitere Informationen zum Konfigurieren dieses Werts finden Sie unter [Aktivieren Sie den S3-optimierten Committer für Amazon EMR 5.19.0](emr-spark-committer-enable.md).
  + Beim Schreiben in nicht partitionierte Hive-Metastore-Tabellen werden nur die Dateiformate Parquet und Orc unterstützt. `spark.sql.hive.convertMetastoreParquet`muss auf gesetzt sein, `true` wenn in nicht partitionierte Parquet Hive-Metastore-Tabellen geschrieben werden. `spark.sql.hive.convertMetastoreOrc`muss auf gesetzt sein, `true` wenn in nicht partitionierte Orc Hive Metastore-Tabellen geschrieben werden. Dies sind die Standardeinstellungen.
  + muss `spark.sql.parquet.output.committer.class` auf `com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter` festgelegt sein. Dies ist die Standardeinstellung.
  + `spark.sql.sources.commitProtocolClass` muss auf `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` oder `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` eingestellt sein. `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` ist die Standardeinstellung für die Amazon-EMR-5.x-Serie, Version 5.30.0 und höher, und für die Amazon-EMR-6.x-Serie, Version 6.2.0 und höher. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` ist die Standardeinstellung für frühere Amazon-EMR-Versionen.
  + Wenn Spark-Aufträge partitionierte Parquet-Datasets durch dynamische Partitionsspalten überschreiben, dann müssen die Schreiboption `partitionOverwriteMode` und `spark.sql.sources.partitionOverwriteMode` auf `static` eingestellt sein. Dies ist die Standardeinstellung.
**Anmerkung**  
Die Schreiboption `partitionOverwriteMode` wurde mit Spark 2.4.0 eingeführt. Legen Sie für Spark-Version 2.3.2, die in Amazon-EMR-Version 5.19.0 enthalten ist, die Eigenschaft `spark.sql.sources.partitionOverwriteMode` fest. 

## Fälle, in denen der S3-optimierte EMRFS-Committer nicht verwendet wird
<a name="emr-spark-committer-reqs-anti"></a>

Im Allgemeinen wird der S3-optimierte Committer in den folgenden Situationen nicht verwendet.


****  

| Situation | Warum der Committer nicht verwendet wird | 
| --- | --- | 
| Wenn Sie in HDFS schreiben | Der Committer unterstützt nur das Schreiben in Amazon S3 mit EMRFS. | 
| Bei Verwendung des S3A-Dateisystems | Der Committer unterstützt nur EMRFS. | 
| Wenn Sie die RDD-API von Spark verwenden MapReduce  | Der Committer unterstützt nur die Verwendung von SparkSQL oder Dataset DataFrame. APIs | 

Die folgenden Scala-Beispiele veranschaulichen einige zusätzliche Situationen, die verhindern, dass der EMRFS-S3-optimierte Committer vollständig (erstes Beispiel) oder teilweise (zweites Beispiel) verwendet werden kann.

**Example – Dynamischer Partitionsüberschreibmodus**  
Das folgende Scala-Beispiel weist Spark an, einen anderen Commit-Algorithmus zu verwenden, wodurch die Verwendung des für EMRFS S3 optimierten Committers insgesamt verhindert wird. Der Code legt die Eigenschaft `partitionOverwriteMode` auf `dynamic` fest, sodass nur die Partitionen überschrieben werden, auf die Sie Daten schreiben. Anschließend werden dynamische Partitionsspalten durch `partitionBy` angegeben und der Schreibmodus ist auf `overwrite` eingestellt.   

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .partitionBy("dt")
  .parquet("s3://amzn-s3-demo-bucket1/output")
```
Sie müssen alle drei Einstellungen konfigurieren, um die Verwendung des S3-optimierten Committers zu vermeiden. Wenn Sie dies tun, führt Spark einen anderen Commit-Algorithmus aus, der im Commit-Protokoll von Spark angegeben ist. Für Amazon-EMR-5.x-Versionen vor 5.30.0 und für Amazon-EMR-6.x-Versionen vor 6.2.0 verwendet das Commit-Protokoll das Staging-Verzeichnis von Spark, ein temporäres Verzeichnis, das unter dem Ausgabespeicherort erstellt wird, der mit `.spark-staging` beginnt. Der Algorithmus benennt Partitionsverzeichnisse nacheinander um, was sich negativ auf die Leistung auswirken kann. Weitere Informationen zu den Amazon-EMR-Versionen 5.30.0 und höher sowie 6.2.0 und höher finden Sie unter [Verwenden Sie das EMRFS-S3-optimierte Commit-Protokoll](emr-spark-s3-optimized-commit-protocol.md).   
Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:  

1. Aufgabenversuche schreiben ihre Ausgabe in Partitionsverzeichnisse unterhalb des Staging-Verzeichnisses von Spark, beispielsweise `${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/`.

1. Für jede geschriebene Partition verfolgt der Aufgabenversuch die relativen Partitionspfade, z. B. `k1=v1/k2=v2`.

1. Wenn eine Aufgabe erfolgreich abgeschlossen wurde, stellt sie dem Treiber alle zugehörigen Partitionspfade bereit, die von ihr nachverfolgt wurden.

1. Nachdem alle Aufgaben abgeschlossen wurden, sammelt die Auftrags-Commit-Phase alle die Partitionsverzeichnisse, die von erfolgreichen Aufgabenversuchen unter dem Staging-Verzeichnis von Spark geschrieben wurden. Spark benennt jedes dieser Verzeichnisse mithilfe von Verzeichnisstruktur-Umbenennungsoperationen sequenziell bis zu ihrem endgültigen Ausgabespeicherort um.

1. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.

**Example – Benutzerdefinierter Partitionsspeicherort**  
In diesem Beispiel wird der Scala-Code in zwei Partitionen eingefügt. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Der S3-optimierte EMRFS-Committer wird nur zum Schreiben der Aufgabenausgabe in die Partition genutzt, die den Standard-Partitionsspeicherort verwendet.  

```
val table = "dataset"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")
                            
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")
                            
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
                            
def asDate(text: String) = lit(text).cast("date")
                            
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .write.insertInto(table)
```
Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.  

1. Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält eine zufällige UUID zum Schutz vor Datei-Kollisionen. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.

1. Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.

1. Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.

1. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.

# Der S3-optimierte EMRFS-Committer und mehrteilige Uploads
<a name="emr-spark-committer-multipart"></a>

Um den S3-optimierten EMRFS-Committer zu verwenden, müssen Sie mehrteilige Uploads in Amazon EMR aktivieren. Mehrteilige Uploads sind standardmäßig aktiviert. Sie können diese Option bei Bedarf erneut aktivieren. Weitere Informationen finden Sie unter [Konfigurieren von mehrteiligen Uploads für Amazon S3](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart) im *Verwaltungshandbuch für Amazon EMR*. 

Der S3-optimierte EMRFS-Committer verwendet die transaktionsähnlichen Merkmale von mehrteiligen Uploads, um sicherzustellen, dass Dateien, die beim Versuch, Aufgaben auszuführen geschrieben werden, nur am Ausgabespeicherort des Auftrags angezeigt werden. Durch die Verwendung von mehrteiligen Uploads auf diese Weise verbessert der Committer die Leistung des Task-Commits im Vergleich zur Standardalgorithmusversion 2. FileOutputCommitter Wenn Sie den S3-optimierten EMRFS-Committer verwenden, gilt es einige wichtige Unterschiede zu dem herkömmlichen Verhalten bei mehrteiligen Uploads zu berücksichtigen:
+ Mehrteilige Uploads werden immer ausgeführt, unabhängig von der Dateigröße. Dies unterscheidet sich vom Standardverhalten von EMRFS, bei dem die Eigenschaft `fs.s3n.multipart.uploads.split.size` die Dateigröße steuert, in der mehrteilige Uploads ausgelöst werden.
+ Mehrteilige Uploads verbleiben für einen längeren Zeitraum in einem Status, in dem sie nicht abgeschlossen sind, bis die Aufgabe übertragen oder abgebrochen wird. Dies unterscheidet sich von der Standard-Verhalten von EMRFS. Dort wird ein mehrteiliger Upload abgeschlossen, wenn eine Aufgabe den Schreibvorgang für eine bestimmte Datei beendet hat.

Aufgrund dieser Unterschiede vergrößert sich bei mehrteiligen Uploads die Wahrscheinlichkeit, dass unvollständige mehrteilige Uploads zurückbleiben, wenn ein Spark Executor JVM abstürzt oder zerstört wird, während Aufgaben ausgeführt oder Daten auf Amazon S3 geschrieben werden. Aus diesem Grund sollten Sie bei Verwendung des S3-optimierten EMRFS-Committer darauf achten, den bewährten Methoden für die Verwaltung von fehlgeschlagenen mehrteiligen Uploads zu folgen. Weitere Informationen finden Sie unter [Bewährte Methoden](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices) für die Arbeit mit Amazon-S3-Buckets im *Verwaltungshandbuch für Amazon EMR*.

# Überlegungen zur Auftragsoptimierung
<a name="emr-spark-committer-tuning"></a>

Der S3-optimierte EMRFS-Committer verbraucht eine geringe Speichermenge für jede Datei, die versuchsweise von einer Aufgabe geschrieben wird, bis die Aufgabe übermittelt oder abgebrochen wird. Bei den meisten Aufträgen ist die Menge des belegten Speichers vernachlässigbar. Bei Aufträgen mit Aufgaben mit langer Ausführungsdauer, die eine große Anzahl von Dateien schreiben, macht sich der Arbeitsspeicher, den der Committer benötigt, bemerkbar, und dies erfordert möglicherweise Anpassungen an den für Spark Executor zugeteilten Arbeitsspeicher. Sie können die Eigenschaft `spark.executor.memory` verwenden, um den Executor-Arbeitsspeicher anzupassen. Als Faustregel gilt: für jeweils 100.000 Dateien, die eine einzelne Aufgabe schreib, werden in der Regel zusätzlich 100 MB Arbeitsspeicher benötigt. Weiter Informationen finden Sie unter [Anwendungseigenschaften](https://spark.apache.org/docs/latest/configuration.html#application-properties) in der Dokumentation zur Konfiguration von Apache Spark.

# Aktivieren Sie den S3-optimierten Committer für Amazon EMR 5.19.0
<a name="emr-spark-committer-enable"></a>

Wenn Sie Amazon EMR 5.19.0 verwenden, können Sie die Eigenschaft `spark.sql.parquet.fs.optimized.committer.optimization-enabled` manuell auf `true` einstellen, wenn Sie einen Cluster erstellen, oder aus Spark heraus, wenn Sie Amazon EMR verwenden.

## Aktivieren des S3-optimierten EMRFS-Committers beim Erstellen eines Clusters
<a name="w2aac62c61c17c13b5"></a>

Verwenden Sie die Konfigurationsklassifizierung `spark-defaults`, um die `spark.sql.parquet.fs.optimized.committer.optimization-enabled`-Eigenschaften auf `true` festzulegen. Weitere Informationen finden Sie unter [Anwendungen konfigurieren](emr-configure-apps.md).

## Aktivieren des S3-optimierten EMRFS-Committers über Spark
<a name="w2aac62c61c17c13b7"></a>

Sie können `spark.sql.parquet.fs.optimized.committer.optimization-enabled` auf `true` festlegen, indem Sie dies in einer `SparkConf` fest kodieren, als `--conf`-Parameter in der Spark-Shell übergeben, die Tools `spark-submit` bzw. `spark-sql` verwenden oder dies in `conf/spark-defaults.conf` angeben. Weiter Informationen finden Sie unter [Spark-Konfiguration](https://spark.apache.org/docs/latest/configuration.html) in der Apache-Spark-Dokumentation.

Im folgenden Beispiel wird gezeigt, wie Sie den Committer während der Ausführung eines spark-sql-Befehls aktivieren.

```
spark-sql \
  --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true \
  -e "INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;"
```

# Verwenden Sie das EMRFS-S3-optimierte Commit-Protokoll
<a name="emr-spark-s3-optimized-commit-protocol"></a>

Das EMRFS S3-optimierte Commit-Protokoll ist eine alternative [FileCommitProtocol](https://spark.apache.org/docs/2.2.0//api/java/org/apache/spark/internal/io/FileCommitProtocol.html)Implementierung, die für das Schreiben von Dateien mit dynamischem Spark-Partitionsüberschreiben in Amazon S3 optimiert ist, wenn EMRFS verwendet wird. Das Protokoll verbessert die Anwendungsleistung, indem Umbenennungsvorgänge in Amazon S3 während der Commit-Phase für das dynamische Überschreiben von Spark-Partitionen vermieden werden. 

Beachten Sie, dass der für [EMRFS S3 optimierte Committer auch die Leistung verbessert, da Umbenennungsvorgänge vermieden](emr-spark-s3-optimized-committer.html) werden. Es funktioniert jedoch nicht für Fälle dynamischer Partitionsüberschreibungen, während die Verbesserungen des Commit-Protokolls nur auf Fälle dynamischer Partitionsüberschreibungen abzielen.

Die Committer-Klasse ist verfügbar für Amazon-EMR-Version 5.30.0 und höher und ist bei 6.2.0 und höher standardmäßig aktiviert. Amazon EMR hat ab Version 5.31.0 eine Verbesserung der Parallelität hinzugefügt. Das Protokoll wird für Spark-Jobs verwendet, die Spark oder Datasets verwenden. DataFrames Es gibt Situationen, in denen das Commit Protokoll nicht verwendet wird. Weitere Informationen finden Sie unter [Anforderungen für das EMRFS-S3-optimierte Commit-Protokoll](emr-spark-committer-reqs.md).

**Topics**
+ [

# Anforderungen für das EMRFS-S3-optimierte Commit-Protokoll
](emr-spark-commit-protocol-reqs.md)
+ [

# Das S3-optimierte EMRFS-Commit-Protokoll und mehrteilige Uploads
](emr-spark-commit-protocol-multipart.md)
+ [

# Überlegungen zur Auftragsoptimierung
](emr-spark-commit-protocol-tuning.md)

# Anforderungen für das EMRFS-S3-optimierte Commit-Protokoll
<a name="emr-spark-commit-protocol-reqs"></a>

Das EMRFS-S3-optimierte Commit-Protokoll wird verwendet, wenn die folgenden Bedingungen erfüllt sind:
+ Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden DataFrames, um partitionierte Tabellen zu überschreiben.
+ Sie führen Spark-Jobs aus, deren Partitionsüberschreibmodus `dynamic` ist.
+ In Amazon EMR sind mehrteilige Uploads aktiviert. Das ist die Standardeinstellung. Weitere Informationen finden Sie unter [Das S3-optimierte EMRFS-Commit-Protokoll und mehrteilige Uploads](emr-spark-commit-protocol-multipart.md). 
+ Der Dateisystem-Cache für EMRFS ist aktiviert. Das ist die Standardeinstellung. Vergewissern Sie sich, dass die Einstellung `fs.s3.impl.disable.cache` auf `false` gesetzt ist. 
+ Die integrierte Datenquellenunterstützung von Spark wird verwendet. Die integrierte Datenquellenunterstützung wird in den folgenden Fällen verwendet:
  + Wenn Aufträge in integrierte Datenquellen oder Tabellen schreiben.
  + Wenn Aufträge in die Parquet-Tabelle des Hive-Metastores schreiben. Das passiert, wenn `spark.sql.hive.convertInsertingPartitionedTable` und `spark.sql.hive.convertMetastoreParquet` beide auf „wahr“ gesetzt sind. Dies sind die Standardeinstellungen.
  + Wenn Aufträge in die ORC-Tabelle des Hive-Metastores schreiben. Das passiert, wenn `spark.sql.hive.convertInsertingPartitionedTable` und `spark.sql.hive.convertMetastoreOrc` beide auf `true` gesetzt sind. Dies sind die Standardeinstellungen.
+ Spark-Auftrags-Vorgänge, die z. B. in einen Standard-Partitionsspeicherort schreiben, z. B. `${table_location}/k1=v1/k2=v2/`, verwenden das Commit-Protokoll. Das Protokoll wird nicht verwendet, wenn ein Jobvorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, beispielsweise wenn ein benutzerdefinierter Partitionsspeicherort mit dem Befehl `ALTER TABLE SQL` festgelegt wird.
+ Für Spark müssen die folgenden Werte verwendet werden:
  + muss `spark.sql.sources.commitProtocolClass` auf `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` festgelegt sein. Dies ist die Standardeinstellung für Amazon-EMR-Versionen 5.30.0 und höher sowie 6.2.0 und höher. 
  + Die Schreiboption `partitionOverwriteMode` oder `spark.sql.sources.partitionOverwriteMode` muss auf `dynamic` gesetzt sein. Die Standardeinstellung lautet `static`.
**Anmerkung**  
Die Schreiboption `partitionOverwriteMode` wurde mit Spark 2.4.0 eingeführt. Legen Sie für Spark-Version 2.3.2, die in Amazon-EMR-Version 5.19.0 enthalten ist, die Eigenschaft `spark.sql.sources.partitionOverwriteMode` fest. 
  + Wenn Spark-Aufträge in die Hive-Metastore-Parquet-Tabelle überschreiben muss `spark.sql.hive.convertMetastoreParquet`, `spark.sql.hive.convertInsertingPartitionedTable` und `spark.sql.hive.convertMetastore.partitionOverwriteMode` auf `true` gesetzt werden. Dies sind die Standardeinstellungen. 
  + Wenn Spark-Aufträge in die Hive-Metastore-ORC-Tabelle überschreiben muss `spark.sql.hive.convertMetastoreOrc`, `spark.sql.hive.convertInsertingPartitionedTable` und `spark.sql.hive.convertMetastore.partitionOverwriteMode` auf `true` gesetzt werden. Dies sind die Standardeinstellungen.

**Example – Dynamischer Partitionsüberschreibmodus**  
In diesem Scala-Beispiel wird die Optimierung ausgelöst. Als erstes setzen Sie die `partitionOverwriteMode`-Eigenschaft auf `dynamic`. Dadurch werden nur die Partitionen überschrieben, auf die Sie Daten schreiben. Anschließend geben Sie dynamische Partitionsspalten mit `partitionBy` an und legen den Schreibmodus auf `overwrite` fest.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## Wenn das EMRFS-S3-optimierte Commit-Protokoll nicht verwendet wird
<a name="emr-spark-commit-protocol-reqs-anti"></a>

Im Allgemeinen funktioniert das für EMRFS S3 optimierte Commit-Protokoll genauso wie das Open-Source-Standard-Spark-Commit-Protokoll. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` In den folgenden Situationen wird keine Optimierung durchgeführt.


****  

| Situation | Warum das Commit-Protokoll nicht verwendet wird | 
| --- | --- | 
| Wenn Sie in HDFS schreiben | Das Commit-Protokoll unterstützt nur das Schreiben in Amazon S3 mit EMRFS-Technologie. | 
| Bei Verwendung des S3A-Dateisystems | Das Commit-Protokoll unterstützt nur EMRFS. | 
| Wenn Sie die RDD-API von MapReduce Spark verwenden | Das Commit-Protokoll unterstützt nur die Verwendung von SparkSQL oder DataFrame Dataset. APIs | 
| Wenn das dynamische Überschreiben der Partition nicht ausgelöst wird | Das Commit-Protokoll optimiert nur Fälle, in denen dynamische Partitionen überschrieben werden. Für andere Fälle siehe [EMRFS-S3-optimierte Committer verwenden](emr-spark-s3-optimized-committer.md). | 

Die folgenden Scala-Beispiele veranschaulichen einige zusätzliche Situationen, die das EMRFS-S3-optimierte Commit-Protokoll an `SQLHadoopMapReduceCommitProtocol` delegiert.

**Example – Dynamischer Partitionsüberschreibmodus mit benutzerdefiniertem Partitionsspeicherort**  
In diesem Beispiel überschreiben die Scala-Programme zwei Partitionen im dynamischen Partitionsüberschreibmodus. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Das S3-optimierte EMRFS-Commit-Protokoll wird nur zum Verbessern der Aufgabenausgabe in die Partition genutzt, die den Standard-Partitionsspeicherort verwendet.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
Das Schreiben in benutzerdefinierte Partitionsspeicherorte in früheren Spark-Versionen kann zu Datenverlust führen. In diesem Beispiel `dt='2019-01-28'` würde die Partition verloren gehen. Weitere Informationen finden Sie unter [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106). Dieses Problem wurde in Amazon-EMR-Version 5.33.0 und höher behoben, ausgenommen 6.0.x und 6.1.x.

Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.

Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:

1. Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält eine zufällige UUID zum Schutz vor Datei-Kollisionen. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.

1. Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.

1. Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.

1. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.

# Das S3-optimierte EMRFS-Commit-Protokoll und mehrteilige Uploads
<a name="emr-spark-commit-protocol-multipart"></a>

Um die Optimierung für dynamisches Partitionsüberschreiben im EMRFS-S3-optimierten Commit-Protokoll nutzen zu können, müssen mehrteilige Uploads in Amazon EMR aktiviert sein. Mehrteilige Uploads sind standardmäßig aktiviert. Sie können diese Option bei Bedarf erneut aktivieren. Weitere Informationen finden Sie unter [Konfigurieren von mehrteiligen Uploads für Amazon S3](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart) im *Verwaltungshandbuch für Amazon EMR*. 

Beim dynamischen Überschreiben der Partition nutzt das für EMRFS S3 optimierte Commit-Protokoll die transaktionsähnlichen Eigenschaften mehrteiliger Uploads, um sicherzustellen, dass Dateien, die durch Aufgabenversuche geschrieben wurden, beim Auftrags-Commit nur am Ausgabeort des Auftrags angezeigt werden. Durch die Verwendung von mehrteiligen Uploads auf diese Weise verbessert das Commit-Protokoll die Leistung von Auftrags-Commits im Vergleich zum Standard-`SQLHadoopMapReduceCommitProtocol`. Wenn Sie das S3-optimierte EMRFS-Commit-Protokoll verwenden, gilt es einige wichtige Unterschiede zu dem herkömmlichen Verhalten bei mehrteiligen Uploads zu berücksichtigen:
+ Mehrteilige Uploads werden immer ausgeführt, unabhängig von der Dateigröße. Dies unterscheidet sich vom Standardverhalten von EMRFS, bei dem die Eigenschaft `fs.s3n.multipart.uploads.split.size` die Dateigröße steuert, in der mehrteilige Uploads ausgelöst werden.
+ Mehrteilige Uploads verbleiben für einen längeren Zeitraum in einem Status, in dem sie nicht abgeschlossen sind, bis die Aufgabe übertragen oder abgebrochen wird. Dies unterscheidet sich von der Standard-Verhalten von EMRFS. Dort wird ein mehrteiliger Upload abgeschlossen, wenn eine Aufgabe den Schreibvorgang für eine bestimmte Datei beendet hat.

Aufgrund dieser Unterschiede vergrößert sich bei mehrteiligen Uploads die Wahrscheinlichkeit, dass unvollständige mehrteilige Uploads zurückbleiben, wenn ein Spark Executor JVM abstürzt oder zerstört wird, während Aufgaben ausgeführt oder Daten auf Amazon S3 oder Spark Driver JVM geschrieben werden. Aus diesem Grund sollten Sie bei Verwendung des S3-optimierten EMRFS-Committer darauf achten, den bewährten Methoden für die Verwaltung von fehlgeschlagenen mehrteiligen Uploads zu folgen. Weitere Informationen finden Sie unter [Bewährte Methoden](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices) für die Arbeit mit Amazon-S3-Buckets im *Verwaltungshandbuch für Amazon EMR*.

# Überlegungen zur Auftragsoptimierung
<a name="emr-spark-commit-protocol-tuning"></a>

Auf Spark-Ausführern verbraucht das für EMRFS-S3-optimierte Commit-Protokoll eine geringe Speichermenge für jede Datei, die versuchsweise von einer Aufgabe geschrieben wird, bis die Aufgabe übermittelt oder abgebrochen wird. Bei den meisten Aufträgen ist die Menge des belegten Speichers vernachlässigbar. 

Auf Spark-Treibern benötigt das für EMRFS-S3-optimierte Commit-Protokoll Speicher, um die Metadateninformationen jeder übergebenen Datei zu speichern, bis der Auftrags festgeschrieben oder abgebrochen wird. Bei den meisten Aufträgen ist die standardmäßige Speichereinstellung des Spark-Treibers vernachlässigbar. 

Bei Aufträgen mit Aufgaben mit langer Laufzeit, die eine große Anzahl von Dateien schreiben, kann der Speicherverbrauch des Commit-Protokolls spürbar sein und Anpassungen des für Spark zugewiesenen Speichers erfordern, insbesondere für Spark-Ausführer. Sie können den Speicher mithilfe der `spark.driver.memory`-Eigenschaft für Spark-Treiber und der Eigenschaft für `spark.executor.memory`-Spark-Aufträge optimieren. Als Faustregel gilt: für jeweils 100.000 Dateien, die eine einzelne Aufgabe schreib, werden in der Regel zusätzlich 100 MB Arbeitsspeicher benötigt. Weiter Informationen finden Sie unter [Anwendungseigenschaften](https://spark.apache.org/docs/latest/configuration.html#application-properties) in der Dokumentation zur Konfiguration von Apache Spark.

# Amazon-S3-Anforderungen mit EMRFS wiederholen
<a name="emr-spark-emrfs-retry"></a>

Dieses Thema enthält Informationen zu den Wiederholungsstrategien, die Sie verwenden können, wenn Sie Anforderungen an Amazon S3 mit EMRFS stellen. Wenn Ihre Anforderungsrate steigt, versucht S3, zu skalieren, um die neue Rate zu unterstützen. Während dieses Vorgangs kann S3 Anforderungen drosseln und einen `503 Slow Down`-Fehler zurückgeben. Um die Erfolgsquote Ihrer S3-Anforderungen zu verbessern, können Sie Ihre Wiederholungsstrategie anpassen, indem Sie Eigenschaften in Ihrer `emrfs-site`-Konfiguration konfigurieren.

Sie können Ihre Wiederholungsstrategie wie folgt anpassen.
+ Erhöhen Sie das maximale Wiederholungslimit für die standardmäßige exponentielle Backoff-Wiederholungsstrategie.
+ Aktivieren und konfigurieren Sie die AIMD-Wiederholungsstrategie (additive-increase/multiplicative-decrease). AIMD wird für Amazon-EMR-Versionen 6.4.0 und höher unterstützt.

## Verwenden Sie die standardmäßige exponentielle Backoff-Strategie
<a name="emr-spark-emrfs-retry-exponential-backoff"></a>

Standardmäßig verwendet EMRFS eine exponentielle Backoff-Strategie, um Amazon-S3-Anfragen erneut zu versuchen. Das standardmäßige EMRFS-Wiederholungslimit ist 15. Um einen `503 Slow Down`-S3-Fehler zu vermeiden, können Sie das Wiederholungslimit erhöhen, wenn Sie einen neuen Cluster, in einem laufenden Cluster oder zur Laufzeit der Anwendung erstellen.

Um das Wiederholungslimit zu erhöhen, müssen Sie den Wert für `fs.s3.maxRetries` in Ihrer `emrfs-site`-Konfiguration ändern. Die folgende Beispielkonfiguration setzt `fs.s3.maxRetries` auf einen benutzerdefinierten Wert von 30.

```
[
    {
      "Classification": "emrfs-site",
      "Properties": {
        "fs.s3.maxRetries": "30"
      }
    }
]
```

Weitere Informationen zum Arbeiten mit Konfigurationsobjekten finden Sie unter [Anwendungen konfigurieren](emr-configure-apps.md).

## Verwenden Sie die AIMD-Wiederholungsstrategie
<a name="emr-spark-emrfs-retry-aimd"></a>

Mit Amazon-EMR-Version 6.4.0 und höher unterstützt EMRFS eine alternative Wiederholungsstrategie, die auf einem AIMD-Modell (Additive-Incree/Multiplicative-Decrease) basiert. Die AIMD-Wiederholungsstrategie ist besonders nützlich, wenn Sie mit großen Amazon-EMR-Clustern arbeiten.

AIMD berechnet anhand von Daten über kürzlich erfolgreiche Anfragen eine benutzerdefinierte Anforderungsrate. Diese Strategie verringert die Anzahl der gedrosselten Anforderungen und die Gesamtzahl der pro Anforderungen erforderlichen Versuche.

Um die AIMD-Wiederholungsstrategie zu aktivieren, müssen Sie die `fs.s3.aimd.enabled`-Eigenschaft in `true` in Ihrer `emrfs-site`-Konfiguration einstellen, wie im folgenden Beispiel gezeigt.

```
[
    {
      "Classification": "emrfs-site",
      "Properties": {
        "fs.s3.aimd.enabled": "true"
      }
    }
]
```

Weitere Informationen zum Arbeiten mit Konfigurationsobjekten finden Sie unter [Anwendungen konfigurieren](emr-configure-apps.md).

## Erweiterte Einstellungen für AIMD-Wiederholungen
<a name="emr-spark-emrfs-retry-advanced-properties"></a>

Sie können die in der folgenden Tabelle aufgeführten Eigenschaften konfigurieren, um das Wiederholungsverhalten zu verfeinern, wenn Sie die AIMD-Wiederholungsstrategie verwenden. Für die meisten Anwendungsfälle empfehlen wir die Verwendung der Standardeinstellung.


**Erweiterte Eigenschaften der AIMD-Wiederholungsstrategie**  

| Eigenschaft | Standardwert | Description | 
| --- | --- | --- | 
| fs.s3.aimd.increaseIncrement | 0.1 | Steuert, wie schnell die Anforderungsrate steigt, wenn aufeinanderfolgende Anforderungen erfolgreich sind. | 
| fs.s3.aimd.reductionFactor | 2 | Steuert, wie schnell die Anforderungsrate sinkt, wenn Amazon S3 eine 503-Antwort zurückgibt. Der Standardfaktor von 2 halbiert die Anforderungsrate. | 
| fs.s3.aimd.minRate | 0.1 | Legt die Untergrenze für die Anforderungsrate fest, wenn Anfragen dauerhaft durch S3 gedrosselt werden. | 
| fs.s3.aimd.initialRate | 5500 | Legt die anfängliche Anforderungsrate fest, die sich dann entsprechend den Werten ändert, die Sie für fs.s3.aimd.increaseIncrement und fs.s3.aimd.reductionFactor angeben.Die anfängliche Rate wird auch für GET-Anforderungen verwendet und für PUT-Anforderungen proportional (3500/5500) skaliert. | 
| fs.s3.aimd.adjustWindow | 2 | Steuert, wie oft die Anforderungsrate angepasst wird, gemessen an der Anzahl der Antworten. | 
| fs.s3.aimd.maxAttempts | 100 | Legt die maximale Anzahl der Versuche fest, eine Anforderungen zu versuchen. | 