

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Die Leistung von Spark optimieren
<a name="emr-spark-performance"></a>

Amazon EMR bietet mehrere Features für die Leistungsoptimierung von Spark. In diesem Thema werden die einzelnen Optimierungsfunktionen im Detail erläutert.

Weitere Informationen zum Festlegen der Spark-Konfiguration finden Sie unter [Konfigurieren von Spark](emr-spark-configure.md).

## Adaptive Abfrageausführung
<a name="emr-spark-performance-aqe"></a>

Die adaptive Abfrageausführung ist ein Framework zur Neuoptimierung von Abfrageplänen auf der Grundlage von Laufzeitstatistiken. Ab Amazon EMR 5.30.0 sind die folgenden Optimierungen für die adaptive Abfrageausführung von Apache Spark 3 auf Apache-Amazon-EMR-Laufzeit für Spark 2 verfügbar.
+ Adaptive Join-Konvertierung
+ Adaptives Zusammenführen von Shuffle-Partitionen

**Adaptive Join-Konvertierung**

Die adaptive Join-Konvertierung verbessert die Abfrageleistung, indem sie Sort-Merge-Join-Operationen in Broadcast-Hash-Join-Operationen konvertiert, die auf der Laufzeitgröße der Abfragephasen basieren. Broadcast-hash-joins tendieren dazu, eine bessere Leistung zu erzielen, wenn eine Seite des Joins klein genug ist, um seine Ausgabe effizient an alle Executoren zu verteilen. Dadurch entfällt die Notwendigkeit, den Austausch zu mischen und beide Seiten des Joins zu sortieren. Die adaptive Join-Konvertierung erweitert das Spektrum der Fälle, in denen Spark automatisch Broadcast-Hash-Joins durchführt.

Dieses Feature ist standardmäßig aktiviert. Sie kann deaktiviert werden, indem `spark.sql.adaptive.enabled` auf `false` gesetzt wird, wodurch auch der adaptive Abfrageausführungsrahmen deaktiviert wird. Spark beschließt, einen Sort-Merge-Join in einen Broadcast-Hash-Join umzuwandeln, wenn die Laufzeitgrößenstatistik einer der Join-Seiten `spark.sql.autoBroadcastJoinThreshold` nicht überschreitet, was standardmäßig 10 485 760 Byte (10 MiB) entspricht.

**Adaptives Zusammenführen von Shuffle-Partitionen**

Das adaptive Zusammenführen von Shuffle-Partitionen verbessert die Abfrageleistung, indem kleine zusammenhängende Shuffle-Partitionen zusammengeführt werden, um den Mehraufwand zu vermeiden, der durch zu viele kleine Aufgaben entsteht. Auf diese Weise können Sie eine höhere Anzahl von anfänglichen Shuffle-Partitionen konfigurieren, die dann zur Laufzeit auf eine bestimmte Größe reduziert werden, wodurch sich die Wahrscheinlichkeit erhöht, dass Shuffle-Partitionen gleichmäßiger verteilt werden.

Dieses Feature ist standardmäßig aktiviert, sofern nicht ausdrücklich `spark.sql.shuffle.partitions` festgelegt ist. Sie kann aktiviert werden, indem Sie `spark.sql.adaptive.coalescePartitions.enabled` auf `true` einstellen. Sowohl die anfängliche Anzahl der Shuffle-Partitionen als auch die Größe der Zielpartition können mit den Eigenschaften `spark.sql.adaptive.coalescePartitions.minPartitionNum` und `spark.sql.adaptive.advisoryPartitionSizeInBytes` eingestellt werden. In der folgenden Tabelle finden Sie weitere Informationen zu den zugehörigen Spark-Eigenschaften für dieses Feature.


**Adaptive Koaleszenzpartitionseigenschaften von Spark**  

| Eigenschaft | Standardwert | Description | 
| --- | --- | --- | 
| `spark.sql.adaptive.coalescePartitions.enabled` | „Wahr“, sofern `spark.sql.shuffle.partitions` nicht explizit festgelegt ist | Wenn „Wahr“ und spark.sql.adaptive.enabled den Wert „Wahr“ hat, fügt Spark zusammenhängende Shuffle-Partitionen entsprechend der Zielgröße (angegeben durch `spark.sql.adaptive.advisoryPartitionSizeInBytes`) zusammen, um zu viele kleine Aufgaben zu vermeiden. | 
| `spark.sql.adaptive.advisoryPartitionSizeInBytes` | 64 MB | Die empfohlene Größe der Shuffle-Partition beim Zusammenführen. Diese Konfiguration wirkt sich nur aus, wenn `spark.sql.adaptive.enabled` und `spark.sql.adaptive.coalescePartitions.enabled` beide gleichzeitig `true` sind. | 
| `spark.sql.adaptive.coalescePartitions.minPartitionNum` | 25 | Die Mindestanzahl der Shuffle-Partitionen nach dem Zusammenführen. Diese Konfiguration wirkt sich nur aus, wenn `spark.sql.adaptive.enabled` und `spark.sql.adaptive.coalescePartitions.enabled` beide gleichzeitig `true` sind. | 
| `spark.sql.adaptive.coalescePartitions.initialPartitionNum` | 1000 | Die anfängliche Anzahl von Shuffle-Partitionen vor dem Zusammenführen. Diese Konfiguration wirkt sich nur aus, wenn `spark.sql.adaptive.enabled` und `spark.sql.adaptive.coalescePartitions.enabled` beide gleichzeitig `true` sind. | 

## Dynamische Partitionsbereinigung
<a name="emr-spark-performance-dynamic"></a>

Die dynamische Partitionsbereinigung verbessert die Auftragsleistung durch eine sorgfältigere Auswahl der spezifischen Partitionen in einer Tabelle, die für eine bestimmte Abfrage gelesen und verarbeitet werden müssen. Indem Sie die Datenmenge reduzieren, die für die Ausführung eines Auftrags gelesen und verarbeitet werden muss, können Sie viel Zeit sparen. In Amazon EMR 5.26.0 ist dieses Feature standardmäßig aktiviert. In Amazon EMR 5.24.0 und 5.25.0 können Sie dieses Feature aktivieren, indem Sie die Spark-Eigenschaft `spark.sql.dynamicPartitionPruning.enabled` innerhalb von Spark oder beim Erstellen von Clustern festlegen. 


**Dynamisches Löschen von Partitionen in Spark, Partitionseigenschaften**  

| Eigenschaft | Standardwert | Description | 
| --- | --- | --- | 
| `spark.sql.dynamicPartitionPruning.enabled` | `true` | Wenn der Wert wahr ist, aktivieren Sie das dynamische Bereinigen von Partitionen. | 
| `spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse` | `true` | Wenn `true`, führt Spark vor der Ausführung der Abfrage eine Defensivprüfung durch, um sicherzustellen, dass die Wiederverwendung von Broadcast-Austauschen in dynamischen Bereinigungsfiltern nicht durch spätere Vorbereitungsregeln, wie z. B. benutzerdefinierte Spaltenregeln, beeinträchtigt wird. Wenn die Wiederverwendung nicht funktioniert und diese Konfiguration `true` ist, entfernt Spark die betroffenen dynamischen Bereinigungsfilter, um Leistungs- und Korrektheitsprobleme zu vermeiden. Korrektheitsprobleme können auftreten, wenn der Broadcast-Austausch des dynamischen Bereinigungsfilters zu unterschiedlichen, inkonsistenten Ergebnissen führt, als der Broadcast-Austausch des entsprechenden Join-Vorgangs. Das Einstellen dieser Konfiguration auf `false` sollte mit Vorsicht erfolgen. Dadurch können Szenarien umgangen werden, z. B. wenn die Wiederverwendung durch benutzerdefinierte Spaltenregeln gestört wird. Wenn Adaptive Abfrageausführung aktiviert ist, wird die Wiederverwendung von Broadcasts immer erzwungen. | 

Diese Optimierung verbessert die vorhandenen Funktionen von Spark 2.4.2, das nur die Weitergabe statischer Prädikate unterstützt, um diese zu geplanten Zeiten aufzulösen.

Im Folgenden finden Sie Beispiele für die Weitergabe statischer Prädikate in Sparke 2.4.2.

```
partition_col = 5

partition_col IN (1,3,5)

partition_col between 1 and 3

partition_col = 1 + 3
```

Die dynamische Partitionsbereinigung ermöglicht es der Spark-Engine, während der Laufzeit dynamisch abzuleiten, welche Partitionen gelesen werden müssen und welche problemlos eliminiert werden können. Die folgende Abfrage beinhaltet beispielsweise zwei Tabellen: die Tabelle `store_sales`, die den Gesamtumsatz aller Geschäfte enthält und nach Regionen partitioniert ist, sowie die Tabelle `store_regions`, die für die einzelnen Länder eine Zuweisung nach Regionen enthält. Die Tabellen enthalten Daten für Geschäfte, die auf der ganzen Welt verteilt sind, allerdings benötigen wir nur die Daten für Nordamerika.

```
select ss.quarter, ss.region, ss.store, ss.total_sales 
from store_sales ss, store_regions sr
where ss.region = sr.region and sr.country = 'North America'
```

Ohne die dynamische Partitionsbereinigung, liest diese Anfrage alle Regionen, bevor Sie die Untergruppe an Regionen herausfiltert, die mit den Ergebnissen der Unterabfrage übereinstimmen. Mit der dynamischen Partitionsbereinigung liest und verarbeitet diese Abfrage nur die Partitionen für die Regionen, die in der Unterabfrage zurückgegeben wurden. Da weniger Daten im Speicher gelesen und weniger Datensätze verarbeitet werden müssen, spart dies Zeit und Ressourcen.

## Abflachen skalarer Unterabfragen
<a name="emr-spark-performance-flatten"></a>

Diese Optimierung verbessert die Leistung von Abfragen, die in der gleichen Tabelle skalare Unterabfragen ausführen. In Amazon EMR 5.26.0 ist dieses Feature standardmäßig aktiviert. In Amazon EMR 5.24.0 und 5.25.0 können Sie es aktivieren, indem Sie die Spark-Eigenschaft `spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled` innerhalb von Spark oder beim Erstellen von Clustern festlegen. Wenn diese Eigenschaft auf „true“ festgelegt ist, flacht der Abfrageoptimierer aggregierte skalare Unterabfragen, die, wenn möglich, dieselbe Relation verwenden, ab. Die skalaren Unterabfragen werden abgeflacht, indem alle in der Unterabfrage vorhandenen Prädikate an die Aggregationsfunktionen weitergegeben werden. Im Anschluss daran wird pro Relation eine Aggregation mit allen Aggregatfunktionen ausgeführt.

Im Folgenden finden Sie ein Beispiel für eine Abfrage, die von dieser Optimierung profitiert.

```
select (select avg(age) from students                    /* Subquery 1 */
                 where age between 5 and 10) as group1,
       (select avg(age) from students                    /* Subquery 2 */
                 where age between 10 and 15) as group2,
       (select avg(age) from students                    /* Subquery 3 */
                 where age between 15 and 20) as group3
```

Die Optimierung schreibt die vorherige Abfrage folgendermaßen um:

```
select c1 as group1, c2 as group2, c3 as group3
from (select avg (if(age between 5 and 10, age, null)) as c1,
             avg (if(age between 10 and 15, age, null)) as c2,
             avg (if(age between 15 and 20, age, null)) as c3 from students);
```

Beachten Sie, dass die umgeschriebene Abfrage die Studententabelle nur einmal liest und die Prädikate der drei Unterabfragen per Push-Verfahren in die `avg`-Funktion weitergegeben werden.

## DISTINCT vor INTERSECT
<a name="emr-spark-performance-distinct"></a>

Diese Optimierung verbessert Joins, wenn INTERSECT verwendet wird. In Amazon EMR 5.26.0 ist dieses Feature standardmäßig aktiviert. In Amazon EMR 5.24.0 und 5.25.0 können Sie es aktivieren, indem Sie die Spark-Eigenschaft `spark.sql.optimizer.distinctBeforeIntersect.enabled` innerhalb von Spark oder beim Erstellen von Clustern festlegen. Abfragen mit INTERSECT werden automatisch so konvertiert, dass sie einen linken halben Join verwenden. Wenn diese Eigenschaft auf true gesetzt ist, leitet der Abfrageoptimierer den DISTINCT-Operator an die untergeordneten Elemente von INTERSECT weiter, wenn er feststellt, dass der DISTINCT-Operator die linke Semi-Verknüpfung zu a anstelle von a machen kann. BroadcastHashJoin SortMergeJoin

Im Folgenden finden Sie ein Beispiel für eine Abfrage, die von dieser Optimierung profitiert.

```
(select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
intersect
(select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
```

Wenn die Eigenschaft `spark.sql.optimizer.distinctBeforeIntersect.enabled`, nicht aktiviert ist, wird die Abfrage folgendermaßen neu geschrieben.

```
select distinct brand from
  (select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

Wenn die Eigenschaft `spark.sql.optimizer.distinctBeforeIntersect.enabled`, nicht aktiviert ist, wird die Abfrage folgendermaßen neu geschrieben.

```
select brand from
  (select distinct item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select distinct item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

## Bloomfilter für Joins
<a name="emr-spark-performance-bloom"></a>

Durch diese Optimierung kann die Leistung einiger Joins verbessert werden, da eine Seite eines Joins mit einem [Bloomfilter](https://en.wikipedia.org/wiki/Bloom_filter) vorgefiltert wird, der aus den Werten von der anderen Seite des Joins generiert wird. In Amazon EMR 5.26.0 ist dieses Feature standardmäßig aktiviert. In Amazon EMR 5.25.0 können Sie dieses Feature aktivieren, indem Sie die Spark-Eigenschaft `spark.sql.bloomFilterJoin.enabled` innerhalb von Spark oder beim Erstellen von Clustern auf `true` setzen.

Im Folgenden finden Sie eine Beispielabfrage, für die die Anwendung eines Bloomfilters geeignet wäre.

```
select count(*)
from sales, item
where sales.item_id = item.id
and item.category in (1, 10, 16)
```

Wenn diese Funktion aktiviert ist, wird der Bloomfilter aus allen Artikel-IDs erstellt, deren Kategorie sich innerhalb des Kategoriensatzes befindet, der abgefragt wird. Beim Scannen der Tabelle SALES wird der Bloomfilter verwendet, um zu ermitteln, welche Verkäufe für Artikel gelten, die sich definitiv nicht im vom Bloomfilter definierten Satz befinden. So können diese ermittelten Verkäufe so früh wie möglich herausgefiltert werden.

## Optimierte Join-Neuanordnung
<a name="emr-spark-performance-join-reorder"></a>

Durch diese Optimierung kann die Abfrageleistung verbessert werden, indem bei Tabellen mit Filtern die Joins neu angeordnet werden. In Amazon EMR 5.26.0 ist dieses Feature standardmäßig aktiviert. In Amazon EMR 5.25.0, können Sie dieses Feature aktivieren, indem Sie den Spark-Konfigurationsparameter `spark.sql.optimizer.sizeBasedJoinReorder.enabled` auf „wahr“ setzen. Das Standardverhalten von Spark ist, dass Tabellen von links nach rechts verknüpft werden, wie in der Abfrage aufgeführt. Dabei kann die Chance verpasst werden, zuerst kleinere Joins mit Filtern auszuführen, was später teureren Joins zugute kommen würde. 

Bei der Beispielabfrage unten werden alle zurückgegebenen Artikel aus allen Läden in einem Land ausgewertet. Ohne optimierte Join-Neuanordnung verknüpft Spark zuerst die zwei großen Tabellen `store_sales` und `store_returns` und verknüpft sie dann mit `store` und schließlich mit `item`.

```
select ss.item_value, sr.return_date, s.name, i.desc, 
from store_sales ss, store_returns sr, store s, item i
where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id
and s.country = 'USA'
```

Bei der optimierten Join-Neuanordnung verknüpft Spark zuerst `store_sales` mit `store`, da `store` über einen Filter verfügt und kleiner ist als `store_returns` und `broadcastable`. Danach führt Spark einen Join mit `store_returns` und schließlich mit `item` durch. Wenn `item` über einen Filter verfügen würde und sendungsfähig wäre, würde es sich ebenfalls für die Neuanordnung eignen, was dazu führen würde, dass `store_sales` mit `store`, dann mit `item` und schließlich mit `store_returns` verknüpft würde.