

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Konnektoren und Dienstprogramme
<a name="emr-connectors"></a>

Amazon EMR bietet eine Reihe von Konnektoren und Dienstprogrammen für den Zugriff auf andere AWS Services als Datenquellen. Sie können in der Regel auf Daten in diesen Services innerhalb eines Programms zugreifen. Sie können beispielsweise einen Kinesis-Stream in einer Hive-Abfrage, einem Pig-Skript oder einer MapReduce Anwendung angeben und dann mit diesen Daten arbeiten.

**Topics**
+ [Tabellen in DynamoDB mit Amazon EMR exportieren, importieren, abfragen und verknüpfen](EMRforDynamoDB.md)
+ [Kinesis](emr-kinesis.md)
+ [DistCp S3 (s3-dist-cp)](UsingEMR_s3distcp.md)
+ [Aufräumen nach fehlgeschlagenen DistCp S3-Jobs](#s3distcp-cleanup)

# Tabellen in DynamoDB mit Amazon EMR exportieren, importieren, abfragen und verknüpfen
<a name="EMRforDynamoDB"></a>

**Anmerkung**  
Der Amazon EMR-DynamoDB-Connector ist als Open Source verfügbar auf. GitHub Weitere Informationen finden Sie unter [https://github.com/awslabs/emr-dynamodb-connector](https://github.com/awslabs/emr-dynamodb-connector).

DynamoDB ist ein vollständig verwalteter NoSQL-Datenbankservice, der schnelle und vorhersehbare Leistung nahtlos skalierbar bereitstellt. Entwickler können eine Datenbanktabelle erstellen und den durch Anfragen erzeugten Datenverkehr oder den Speicher unbegrenzt erweitern. DynamoDB verteilt die Daten und den Datenverkehr für die Tabelle automatisch auf eine ausreichende Anzahl von Servern, um die vom Kunden angegebene Anforderungskapazität zu erreichen und die gespeicherte Datenmenge zu verarbeiten und dabei gleichzeitig eine konsistente, schnelle Leistung beizubehalten. Mit Amazon EMR und Hive können Sie große Datenmengen, z. B. in DynamoDB gespeicherte Date, schnell und effizient verarbeiten. Weitere Informationen über DynamoDB finden Sie im [Entwicklerhandbuch von Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/).

Apache Hive ist ein Software-Layer, mit dem Sie MapReduce-Cluster mithilfe einer vereinfachten, SQL-ähnlichen Sprache namens HiveQL abfragen können. Die Software baut auf der Hadoop-Architektur auf. Weitere Informationen zu Hive und HiveQL erhalten Sie in der [HiveQL-Sprachreferenz](https://cwiki.apache.org/confluence/display/Hive/LanguageManual). Weitere Informationen über Hive und Amazon EMR erhalten Sie unter [Apache Hive](emr-hive.md).

Sie können Amazon EMR mit einer benutzerdefinierten Hive-Version verwenden, die Konnektivität mit DynamoDB umfasst, um Operationen für in DynamoDB gespeicherte Daten auszuführen:
+ Laden von DynamoDB-Daten in das Hadoop Distributed File System (HDFS) und deren Verwendung als Input in einem Amazon-EMR-Cluster.
+ Abfragen von Live DynamoDB–Daten mit SQL-ähnlichen Anweisungen (HiveQL).
+ Verknüpfen von in DynamoDB gespeicherten Daten und deren Export oder Abfragen der verknüpften Daten.
+ Exportieren von in DynamoDB gespeicherten Daten nach Amazon S3.
+ Importieren von in Amazon S3 gespeicherten Daten in DynamoDB.

**Anmerkung**  
Der Amazon-EMR-DynamoDB-Konnektor unterstützt keine Cluster, die zur Verwendung der [Kerberos-Authentifizierung](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html) konfiguriert sind.

Für jede der folgenden Aufgaben starten Sie einen Amazon-EMR-Cluster, geben den Speicherort der Daten in DynamoDB an und führen Hive-Befehle zum Ändern der Daten in DynamoDB aus. 

Es gibt mehrere Möglichkeiten, einen Amazon EMR-Cluster zu starten: Sie können die Amazon EMR-Konsole, die Befehlszeilenschnittstelle (CLI) verwenden oder Ihren Cluster mithilfe eines AWS SDK oder der Amazon EMR-API programmieren. Sie können auch bestimmen, ob ein Hive-Cluster interaktiv oder über ein Skript ausgeführt werden soll. In diesem Abschnitt wird beschrieben, wie Sie einen interaktive Hive-Cluster über die Amazon-EMR-Konsole und die CLI starten. 

Die interaktive Verwendung von Hive ist eine hervorragende Möglichkeit zum Testen der Abfrageleistung und Optimieren Ihrer Anwendung. Nachdem Sie eine Gruppe von Hive-Befehlen eingerichtet haben, die regelmäßig ausgeführt werden, sollten Sie erwägen, ein Hive-Skript zu erstellen, das Amazon EMR für Sie ausführen kann. 

**Warnung**  
Amazon-EMR-Lese- oder Schreibvorgänge in einer DynamoDB-Tabelle werden der festgelegten, bereitgestellten Durchsatzkapazität angerechnet, wodurch sich die Häufigkeit von Ausnahmen des bereitgestellten Durchsatzes erhöht. Bei großen Anforderungen implementiert Amazon EMR Wiederholungen mit exponentiellem Backoff zur Verwaltung der Anforderungslast der DynamoDB-Tabelle. Das Ausführen von Amazon-EMR-Aufträge gleichzeitig mit anderem Datenverkehr kann dazu führen, dass Sie die zugewiesene, bereitgestellte Durchsatzstufe überschreiten. Sie können dies überwachen, indem Sie die **ThrottleRequests**Metrik in Amazon überprüfen CloudWatch. Wenn die Anforderungslast zu hoch ist, können Sie den Cluster neu starten und [Einstellung der Leserate in Prozent](EMR_Hive_Optimizing.md#ReadPercent) oder [Einstellung der Schreibrate in Prozent](EMR_Hive_Optimizing.md#WritePercent) auf einen geringeren Wert festlegen, um die Amazon-EMR-Operationen zu drosseln. Weitere Informationen zu DynamoDB-Durchsatzeinstellungen finden Sie unter [Bereitgestellter Durchsatz](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html#ProvisionedThroughput).   
Wenn eine Tabelle für den [On-Demand-Modus](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand) konfiguriert ist, sollten Sie die Tabelle wieder in den Bereitstellungsmodus ändern, bevor Sie einen Export- oder Importvorgang ausführen. Pipelines benötigen ein Durchsatzverhältnis, um die von einem Dynamo DBtable zu verwendenden Ressourcen berechnen zu können. Im On-Demand-Modus wird der bereitgestellte Durchsatz entfernt. Um Durchsatzkapazität bereitzustellen, können Sie Amazon CloudWatch Events-Metriken verwenden, um den Gesamtdurchsatz auszuwerten, den eine Tabelle verwendet hat.

**Topics**
+ [Eine Hive-Tabelle einrichten, um Hive-Befehle auszuführen](EMR_Interactive_Hive.md)
+ [Hive-Befehlsbeispiele für das Exportieren, Importieren und Abfragen von Daten in DynamoDB](EMR_Hive_Commands.md)
+ [Optimieren der Leistung von Amazon-EMR-Operationen in DynamoDB](EMR_Hive_Optimizing.md)

# Eine Hive-Tabelle einrichten, um Hive-Befehle auszuführen
<a name="EMR_Interactive_Hive"></a>

Apache Hive ist eine Data Warehouse-Anwendung, mit der Sie Daten abfragen können, die in Amazon-EMR-Clustern enthalten sind. Dies erfolgt mit einer SQL-ähnlichen Sprache. Weitere Informationen zu Hive finden Sie unter [http://hive.apache.org/](http://hive.apache.org/).

Im folgenden Verfahren wird davon ausgegangen, dass Sie bereits einen Cluster und ein Amazon-EC2-Schlüsselpaar angegeben haben. Informationen zu den ersten Schritten beim Erstellen von Clustern finden Sie unter [Erste Schritte mit Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs) im *Verwaltungshandbuch für Amazon EMR*.

## Konfigurieren Sie Hive für die Verwendung MapReduce
<a name="hive-mapreduce"></a>

Wenn Sie Hive in Amazon EMR zum Abfragen von DynamoDB-Tabellen verwenden, können Fehler auftreten, falls Hive die Standard-Engine Tez für die Ausführung nutzt. Aus diesem Grund empfehlen wir, bei der Erstellung eines Clusters mit Hive, der wie in diesem Abschnitt beschrieben in DynamoDB integriert ist, eine Konfigurationsklassifizierung zu verwenden, die Hive auf die Verwendung festlegt. MapReduce Weitere Informationen finden Sie unter [Anwendungen konfigurieren](emr-configure-apps.md).

Der folgende Ausschnitt zeigt die Konfigurationsklassifizierung und die Eigenschaft, die verwendet werden sollen, um sie MapReduce als Ausführungs-Engine für Hive festzulegen:

```
[
                {
                    "Classification": "hive-site",
                    "Properties": {
                        "hive.execution.engine": "mr"
                    }
                }
             ]
```<a name="EMR_Interactive_Hive_session"></a>

**Interaktives Ausführen von Hive-Befehlen**

1. Herstellen einer Verbindung mit dem Master-Knoten. Weitere Informationen finden Sie unter [Mit SSH eine Verbindung zum Hauptknoten herstellen](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) im *Verwaltungshandbuch für Amazon EMR*.

1. Geben Sie an der Eingabeaufforderung für den aktuellen Master-Knoten `hive` ein.

   Sie sollten eine Hive-Eingabeaufforderung sehen: `hive>`

1.  Geben Sie einen Hive-Befehl ein, der die Daten in DynamoDB einer Tabelle in der Hive-Anwendung zuordnet. Diese Tabelle fungiert als Referenz für die in Amazon DynamoDB gespeicherten Daten. Die Daten werden nicht lokal in Hive gespeichert und alle Abfragen, die diese Tabelle verwenden, werden mit den Live-Daten in DynamoDB abgeglichen. Dabei wird die Lese- und Schreibkapazität der Tabelle jedes Mal verbraucht, wenn ein Befehl ausgeführt wird. Wenn Sie mehrere Hive-Befehle für denselben Datensatz ausführen möchten, sollten Sie ihn zunächst exportieren. 

    Das folgende Beispiel zeigt die Syntax für die Zuweisung einer Hive-Tabelle zu einer DynamoDB-Tabelle. 

   ```
   CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename", 
   "dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...");
   ```

    Wenn Sie in Hive eine Tabelle mithilfe der DynamoDB erstellen möchten, müssen Sie sie unter Verwendung des Schlüsselworts `EXTERNAL` als externe Tabelle erstellen. Der Unterschied zwischen internen und externen Tabellen liegt darin, dass die Daten in internen Tabellen gelöscht werden, wenn die interne Tabelle gelöscht wird. Dies ist nicht das gewünschte Verhalten, wenn eine Verbindung zur Amazon DynamoDB besteht und somit werden nur externe Tabellen unterstützt. 

    Beispielsweise wird mit dem folgenden Befehl eine Tabelle mit dem Namen *hivetable1* in Hive erstellt, die auf die DynamoDB-Tabelle namens *dynamodbtable1* verweist. Die DynamoDB-Tabelle *dynamodbtable1* hat ein Primärschlüsselschema. hash-and-range Das Hash-Schlüsselelement ist `name` (Zeichenfolgetyp), das Bereichsschlüsselelement ist `year` (numerischer Typ) und jedes Element verfügt über einen Attributwert für `holidays` (Zeichenfolgetyp). 

   ```
   CREATE EXTERNAL TABLE hivetable1 (col1 string, col2 bigint, col3 array<string>)
   STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");
   ```

    Zeile 1 verwendet die HiveQL-Anweisung `CREATE EXTERNAL TABLE`. Für *hivetable1* müssen Sie für jedes Attributnamen-Wertpaar in der DynamoDB-Tabelle eine Spalte erstellen und den Datentyp angeben. Für diese Werte muss die Groß- und Kleinschreibung nicht beachtet werden und Sie können den Spalten beliebige Namen geben (außer reservierten Wörtern). 

    Zeile 2 verwendet die `STORED BY`-Anweisung. Bei dem Wert `STORED BY` handelt es sich um den Namen der Klasse, die die Verbindung zwischen Hive und DynamoDB handhabt. Er sollte auf `'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'` festgelegt werden. 

    Zeile 3 verwendet die `TBLPROPERTIES`-Anweisung, um „hivetable1“ mit der richtigen Tabelle und dem Schema in DynamoDB zu verknüpfen. Stellen Sie `TBLPROPERTIES` die Werte für den `dynamodb.table.name`-Parameter und `dynamodb.column.mapping`-Parameter bereit. Bei diesen Werten *muss* die Groß- und Kleinschreibung beachtet werden.
**Anmerkung**  
 Alle DynamoDB-Attributnamen für die Tabelle müssen über entsprechende Spalten in der Hive-Tabelle verfügen. Je nach Ihrer Amazon EMR-Version treten die folgenden Szenarien auf, wenn die one-to-one Zuordnung nicht existiert:  
In Amazon EMR Version 5.27.0 und höher verfügt der Connector über Validierungen, die eine one-to-one Zuordnung zwischen DynamoDB-Attributnamen und Spalten in der Hive-Tabelle sicherstellen. Wenn die Zuordnung nicht existiert, tritt ein Fehler auf. one-to-one
Ab Amazon-EMR-Version 5.26.0 und früher enthält die Hive-Tabelle kein Name-Wert-Paar aus DynamoDB mehr. Wenn Sie die Primärschlüsselattribute der DynamoDB nicht zuordnen, generiert Hive einen Fehler. Wenn Sie ein nicht-primäres Schlüsselattribut nicht zuordnen, wird kein Fehler generiert, aber die Daten werden auch nicht in der Hive-Tabelle angezeigt. Wenn die Datentypen nicht übereinstimmen, ist der Wert Null. 

Anschließend können Sie die Hive-Vorgänge in *hivetable1* starten. Abfragen für *hivetable1* werden intern über die DynamoDB-Tabelle *dynamodbtable1* Ihres DynamoDB-Kontos ausgeführt, wodurch bei jeder Ausführung Lese- oder Schreibeinheiten verbraucht werden.

Wenn Sie Hive-Abfragen über eine DynamoDB-Tabelle ausführen, müssen Sie sicherstellen, dass ausreichend Lesekapazitätseinheiten vorhanden sind.

Nehmen Sie zum Beispiel an, dass Sie 100 Lesekapazitätseinheiten für Ihre DynamoDB-Tabelle bereitgestellt haben. Dadurch können Sie 100 Lesevorgänge oder 409.600 Byte pro Sekunde ausführen. Wenn diese Tabelle 20 GB an Daten enthält (21.474.836.480 Bytes) und Ihre Hive-Abfrage einen vollständigen Tabellen-Scan durchführt, können Sie ermitteln, wie lange die Ausführung der Abfrage dauern wird:

 * 21 474 836 480/409 600 = 52 429 Sekunden = 14,56 Stunden * 

Die einzige Möglichkeit, um die erforderliche Zeit zu verkürzen, würde eine Änderung der Lesekapazitätseinheiten in der DynamoDB-Quelltabelle notwendig machen. Das Hinzufügen zusätzlicher Amazon-EMR-Knoten ist nicht hilfreich.

Der Fertigstellungsgrad in der Hive-Ausgabe wird dann aktualisiert, wenn ein oder mehrere Mapper-Prozesse abgeschlossen wurden. Für eine große DynamoDB-Tabelle mit geringer Lesekapazität wird der Fertigungsgrad der Ausgabe ggf. für längere Zeit nicht aktualisiert. Im oben genannten Fall wird der Auftrag mehrere Stunden lang als zu 0 % abgeschlossen angezeigt. Weitere Details zum Fortschritt Ihres Auftrags finden Sie in der Amazon-EMR-Konsole. Hier können Sie den Status einzelner Mapper-Aufgaben sowie Statistiken für Lesevorgänge anzeigen. Sie können sich auch bei der Hadoop-Schnittstelle auf dem Master-Knoten anmelden und die Hadoop-Statistiken anzeigen. Hier werden Ihnen der Status einzelner Map-Aufgaben sowie Statistiken für Lesevorgänge angezeigt. Weitere Informationen finden Sie unter den folgenden Themen:
+ [Auf Hauptknoten gehostete Web-Schnittstellen](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html)
+ [Hadoop-Web-Schnittstellen anzeigen](https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingtheHadoopUserInterface.html)

Weitere Informationen über HiveQL-Beispielanweisungen, um Aufgaben auszuführen, die bei der Ausführung von Aufgaben, wie dem Exportieren oder Importieren von DynamoDB-Daten sowie dem Verknüpfen von Tabellen behilflich sind, finden Sie unter [Hive-Befehlsbeispiele für das Exportieren, Importieren und Abfragen von Daten in DynamoDB](EMR_Hive_Commands.md).<a name="EMR_Hive_Cancel"></a>

**Abbrechen einer Hive-Anfrage**

Wenn Sie eine Hive-Abfrage ausführen, enthält die Erstantwort vom Server den Befehl, die Anfrage abzubrechen. Um die Anfrage zu jedem Zeitpunkt während des Vorgangs abzubrechen, verwenden Sie den **Kill Command (Beendigungsbefehl)** aus der Serverantwort.

1. Geben Sie `Ctrl+C` ein, um den Befehlszeilen-Client zu beenden.

1.  Geben Sie nach der Shell-Eingabeaufforderung den **Kill Command (Beendigungsbefehl)** aus der Erstantwort des Servers auf Ihre Anfrage ein. 

    Alternativ können Sie den folgenden Befehl von der Befehlszeile des Master-Knotens aus ausführen, um den Hadoop-Job zu beenden. Dabei *job-id* handelt es sich um die ID des Hadoop-Jobs, die über die Hadoop-Benutzeroberfläche abgerufen werden kann.

   ```
   hadoop job -kill job-id
   ```

## Datentypen für Hive und DynamoDB
<a name="EMR_Hive_Properties"></a>

Die folgende Tabelle zeigt die verfügbaren Hive-Datentypen, den DynamoDB-Standardtyp, dem sie entsprechen, und die alternativen DynamoDB-Typen, denen sie auch zugeordnet werden können. 


| Hive-Typ | Standardtyp DynamoDB | Alternative DynamoDB-Typ | 
| --- | --- | --- | 
| Zeichenfolge | String (S) |  | 
| Bigint oder Doppel | Ganzzahl |  | 
| Binary | Binary (B) |  | 
| boolesch | boolescher Wert (BOOL) |  | 
| Array | list (L) | Zahlensatz (ZS), ein Zeichenfolgesatz (ZFS) oder ein Binärsatz (BS) | 
| map<string,string> | Element | map (M) | 
| map<string,?> | map (M) |  | 
|  | null (NULL) |  | 

Wenn Sie Ihre Hive-Daten als entsprechenden alternativen DynamoDB-Typ schreiben möchten oder wenn Ihre DynamoDB-Daten Attributwerte eines alternativen -Typs enthalten, können Sie die Spalte und den DynamoDB-Typ mit dem Parameter `dynamodb.type.mapping` angeben. Das folgende Beispiel zeigt die Syntax für die Angabe einer alternativen Typzuweisung.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.type.mapping" = "hive_column1_name:dynamodb_attribute1_datatype");
```

Der Parameter für die Typzuweisung ist optional und muss nur für die Spalten angegeben werden, die alternative Typen verwenden.

Beispielsweise erstellt der folgende Hive-Befehl eine Tabelle mit dem Namen `hivetable2`, die auf die DynamoDB-Tabelle `dynamodbtable2` verweist. Sie ist mit `hivetable1` vergleichbar, außer dass sie die `col3`-Spalte dem String Set(SS)-Typ zuordnet. 

```
CREATE EXTERNAL TABLE hivetable2 (col1 string, col2 bigint, col3 array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable2",
"dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays",
"dynamodb.type.mapping" = "col3:SS");
```

In Hive sind `hivetable1` und `hivetable2` identisch. Wenn jedoch Daten aus diesen Tabellen in die entsprechenden DynamoDB-Tabellen geschrieben werden, enthält `dynamodbtable1` Listen, während `dynamodbtable2` Zeichenfolgensätze enthält.

Wenn Sie Hive-`null`-Werte als Attribute des DynamoDB-`null`-Typs schreiben möchten, ist dies mit dem Parameter `dynamodb.null.serialization` möglich. Das folgende Beispiel zeigt die Syntax für die Angabe der `null`-Serialisierung.

```
CREATE EXTERNAL TABLE hive_tablename (hive_column1_name column1_datatype, hive_column2_name column2_datatype...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" = "hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name...",
"dynamodb.null.serialization" = "true");
```

Der Serialisierungsparameter Null ist optional und wird auf `false` gesetzt, wenn er nicht angegeben wird. Beachten Sie, dass DynamoDB `null`-Attribute unabhängig von der Parametereinstellung als `null` -Werte in Hive gelesen werden. Hive-Sammlungen mit `null`-Werten können nur dann in DynamoDB geschrieben werden, wenn der Null-Serialisierungsparameter als `true` angegeben ist. Andernfalls tritt ein Hive-Fehler auf.

Der bigint-Typ in Hive ist identisch mit dem Java-Typ "long" und der Hive-Doppeltyp entspricht mit dem Java-Doppeltyp im Hinblick auf die Genauigkeit. Das bedeutet, dass Sie für Ihre in DynamoDB gespeicherten numerischen Daten, deren Präzision höher liegt als in den Hive-Datentypen, bei der Verwendung von Hive für den Export oder Import oder bei einem Verweis auf die DynamoDB-Daten mit einem Präzisionsverlust oder einem Scheitern der Hive-Abfrage rechnen müssen. 

 Exporte des Binärtyps von der DynamoDB auf Amazon Simple Storage Service (Amazon S3) oder HDFS werden als Base64-kodierte Zeichenfolgen gespeichert. Wenn Sie Daten aus Amazon S3 oder HDFS in den DynamoDB-Binärtyp importieren, sollten die Daten als Base64-Zeichenfolge kodiert sein. 

## Hive-Optionen
<a name="EMR_Hive_Options"></a>

 Sie können die folgenden Hive-Optionen festlegen, um die Datenübertragung aus Amazon DynamoDB zu verwalten. Diese Optionen gelten nur für die aktuelle Hive-Sitzung. Wenn Sie die Hive-Eingabeaufforderung schließen und später auf dem Cluster erneut öffnen, werden diese Einstellungen auf die Standardwerte zurückgesetzt. 


| Hive-Optionen | Description | 
| --- | --- | 
| dynamodb.throughput.read.percent |   Legt die Rate der Lesevorgänge so fest, dass Ihre von DynamoDB bereitgestellte Durchsatzrate im für Ihre Tabelle zugewiesenen Bereich liegt. Der Wert liegt zwischen `0.1` und `1.5` (einschließlich).   Der Wert für die Standardleserate liegt bei 0,5. Das bedeutet, dass Hive versuchen wird, die Hälfte der Lesekapazität, die für Ressourcen in der Tabelle bereitgestellt ist, zu verbrauchen. Eine Erhöhung dieses Werts auf über 0,5 verbessert die Leseanforderungsrate. Eine Verringerung dieses Werts unter 0,5 verringert die Leseanforderungsrate. Bei dieser Leserate handelt es sich um einen ungefähren Wert. Die tatsächliche Leserate variiert abhängig von Faktoren wie der Tatsache, ob eine einheitliche Verteilung der Schlüssel in DynamoDB vorliegt.   Wenn Sie feststellen, dass der bereitgestellte Durchsatz häufig durch die Hive-Operation überschritten wird oder wenn der Live-Lesedatenverkehr zu sehr gedrosselt wird, reduzieren Sie diesen Wert auf unter `0.5`. Wenn Sie über genügend Kapazität verfügen und sich schnellere Hive-Operationen wünschen, legen Sie den Wert auf über `0.5` fest. Sie können auch überzeichnen, indem Sie den Wert auf 1,5 festlegen, wenn Sie der Ansicht sind, dass ungenutzte Ein-/Ausgabevorgänge verfügbar sind.   | 
| dynamodb.throughput.write.percent |   Legt die Rate der Schreibvorgänge so fest, dass Ihre von DynamoDB bereitgestellte Durchsatzrate im für Ihre Tabelle zugewiesenen Bereich liegt. Der Wert liegt zwischen `0.1` und `1.5` (einschließlich).   Der Wert für die Standardschreibrate liegt bei 0,5. Das bedeutet, dass Hive versuchen wird, die Hälfte der Schreibkapazität, die für Ressourcen in der Tabelle bereitgestellt ist, zu verbrauchen. Eine Erhöhung dieses Werts auf über 0,5 verbessert die Schreibanforderungsrate. Eine Verringerung dieses Werts unter 0,5 verringert die Schreibanforderungsrate. Bei dieser Schreibrate handelt es sich um einen ungefähren Wert. Die tatsächliche Schreibrate variiert abhängig von Faktoren wie der Tatsache, ob eine einheitliche Verteilung der Schlüssel in DynamoDB vorliegt.   Wenn Sie feststellen, dass der bereitgestellte Durchsatz häufig durch die Hive-Operation überschritten wird oder wenn der Live-Schreibdatenverkehr zu sehr gedrosselt wird, reduzieren Sie diesen Wert auf unter `0.5`. Wenn Sie über genügend Kapazität verfügen und sich schnellere Hive-Operationen wünschen, legen Sie den Wert auf über `0.5` fest. Sie können auch überzeichnen, indem Sie den Wert auf 1,5 festlegen, wenn Sie der Ansicht sind, dass ungenutzte Ein-/Ausgabevorgänge verfügbar sind oder es sich hierbei um das anfängliche Hochladen von Daten in die Tabelle handelt und noch kein aktiver Datenverkehr vorliegt.   | 
| dynamodb.endpoint | Geben Sie den Endpunkt für den DynamoDB-Service an. Weitere Informationen über die verfügbaren DynamoDB-Endpunkte finden Sie unter [Regionen und Endpunkte](https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region).  | 
| dynamodb.max.map.tasks |   Geben Sie die maximale Anzahl von Zuordnungs-Aufgaben beim Lesen von Daten aus DynamoDB an. Dieser Wert muss gleich oder größer 1 sein.   | 
| dynamodb.retry.duration |   Geben Sie die Anzahl der Minuten an, die als Timeout-Dauer für das Wiederholen der Hive-Befehle verwendet werden soll. Bei diesem Wert muss es sich um eine Ganzzahl gleich oder größer als 0 handeln. Die Standard-Timeout-Dauer beträgt zwei Minuten.   | 

 Diese Optionen werden über den Befehl `SET`, wie im folgenden Beispiel gezeigt, festgelegt. 

```
SET dynamodb.throughput.read.percent=1.0; 

INSERT OVERWRITE TABLE s3_export SELECT * 
FROM hiveTableName;
```

# Hive-Befehlsbeispiele für das Exportieren, Importieren und Abfragen von Daten in DynamoDB
<a name="EMR_Hive_Commands"></a>

Die folgenden Beispiele verwenden Hive-Befehle für Operationen wie das Exportieren von Daten nach DynamoDB oder HDFS, das Importieren von Daten in Amazon S3, das Verknüpfen und Abfragen von Tabellen usw. 

Operationen für eine Hive-Tabelle verweisen auf Daten, die in DynamoDB gespeichert sind. Hive-Befehle sind abhängig von den Einstellungen der DynamoDB-Tabelle für den bereitgestellten Durchsatz. Die Daten, die abgerufen werden, umfassen die Daten, die zum Zeitpunkt der Verarbeitung der Hive-Operationsanforderung durch DynamoDB in die DynamoDB-Tabelle geschrieben werden. Wenn der Datenabruf viel Zeit in Anspruch nimmt, wurden einige der vom Hive-Befehl zurückgegebenen Daten möglicherweise seit Starten des Hive-Befehls in DynamoDB aktualisiert. 

Die Hive-Befehle `DROP TABLE` und `CREATE TABLE` gelten nur für die lokalen Tabellen in Hive und erstellen bzw. löschen keine Tabellen in DynamoDB. Wenn Ihre Hive-Abfrage auf eine Tabelle in DynamoDB verweist, muss diese bereits vorhanden sein, bevor Sie die Abfrage ausführen. Weitere Informationen zum Erstellen und Löschen von Tabellen in DynamoDB finden Sie unter [Arbeiten mit Tabellen in DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html) im *Amazon-DynamoDB-Entwicklerhandbuch*. 

**Anmerkung**  
 Wenn Sie eine Hive-Tabelle einem Speicherort in Amazon S3 zuordnen, ordnen Sie sie nicht dem Stammpfad des Buckets, s3://amzn-s3-demo-bucket, zu, da dies zu Fehlern führen kann, wenn Hive die Daten in Amazon S3 schreibt. Ordnen Sie die Tabelle stattdessen einem Unterpfad des Buckets zu, s3://amzn-s3-demo-bucket/mypath. 

## Exportieren von Daten aus DynamoDB
<a name="EMR_Hive_Commands_exporting"></a>

 Sie können mit Hive Daten aus DynamoDB exportieren. 

**So exportieren Sie eine DynamoDB-Tabelle in einen Amazon-S3-Bucket**
+  Erstellen Sie eine Hive-Tabelle, die auf in DynamoDB gespeicherte Daten verweist. Anschließend können Sie den Befehl INSERT OVERWRITE aufrufen, um die Daten in ein externes Verzeichnis zu schreiben. Im folgenden Beispiel *s3://amzn-s3-demo-bucket/path/subpath/* ist dies ein gültiger Pfad in Amazon S3. Passen Sie die Spalten und Datentypen im CREATE-Befehl an, um mit den Werten in Ihrer DynamoDB übereinzustimmen. Sie können diese Methode zum Erstellen eines Archivs Ihrer DynamoDB-Daten in Amazon S3 verwenden. 

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                   
  5.                     
  6. INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/' SELECT * 
  7. FROM hiveTableName;
  ```

**So exportieren Sie eine DynamoDB-Tabelle in einen Amazon S3-Bucket mit Formatierung**
+  Erstellen Sie eine externe Tabelle, die auf einen Speicherort in Amazon S3 verweist. Dies wird unten als "s3\$1export" gezeigt. Geben Sie während des CREATE-Aufrufs die Zeilenformatierung für die Tabelle an. Wenn Sie dann INSERT OVERWRITE für den Export von Daten aus DynamoDB in s3\$1export verwenden, werden die Daten in dem angegebenen Format geschrieben. Im folgenden Beispiel werden die Daten als CSV-Werte geschrieben. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

**So exportieren Sie eine DynamoDB-Tabelle in einen Amazon-S3-Bucket ohne Angabe einer Spaltenzuordnung**
+  Erstellen Sie eine Hive-Tabelle, die auf in DynamoDB gespeicherte Daten verweist. Dies ähnelt dem vorhergehenden Beispiel mit der Ausnahme, dass Sie keine Spaltenzuordnung angeben. Die Tabelle muss über genau einen Spaltentyp `map<string, string>` verfügen. Wenn Sie dann eine `EXTERNAL`-Tabelle in Amazon S3 erstellen, können Sie den Befehl `INSERT OVERWRITE` zum Schreiben der Daten von DynamoDB nach Amazon S3 aufrufen. Sie können diese Methode zum Erstellen eines Archivs Ihrer DynamoDB-Daten in Amazon S3 verwenden. Da es keine Spaltenzuordnung gibt, können Sie keine Tabellen abfragen, die auf diese Weise exportiert werden. Das Exportieren von Daten ohne Angabe einer Spaltenzuordnung ist ab Hive 0.8.1.5 verfügbar, was unter Amazon-EMR-AMI 2.2.*x* und höher unterstützt wird. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
   4.     
   5. CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
   6. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
   7. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
   8.                 
   9. INSERT OVERWRITE TABLE s3TableName SELECT * 
  10. FROM hiveTableName;
  ```

**So exportieren Sie eine DynamoDB-Tabelle in einen Amazon-S3-Bucket mit Datenkomprimierung**
+  Hive bietet mehrere Kompressions-Codecs an, die Sie während der Hive-Sitzung einrichten können. Dadurch werden die exportierten Daten im angegebenen Format komprimiert. Im folgenden Beispiel werden die exportierten Dateien mithilfe des Lempel-Ziv-Oberhumer (LZO) -Algorithmus komprimiert. 

  ```
   1. SET hive.exec.compress.output=true;
   2. SET io.seqfile.compression.type=BLOCK;
   3. SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;                    
   4.                     
   5. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   6. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   7. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   8. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                    
   9.                     
  10. CREATE EXTERNAL TABLE lzo_compression_table (line STRING)
  11. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  12. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  13.                     
  14. INSERT OVERWRITE TABLE lzo_compression_table SELECT * 
  15. FROM hiveTableName;
  ```

   Die verfügbaren Kompressions-Codecs sind: 
  +  org.apache.hadoop.io.compress. GzipCodec 
  +  org.apache.hadoop.io.com/press. DefaultCodec 
  +  com.hadoop.compression.lzo. LzoCodec 
  +  com.hadoop.compression.lzo. LzopCodec 
  +  org.apache.hadoop.io.com/press. BZip2Codec 
  +  org.apache.hadoop.io.com/press. SnappyCodec 

**So exportieren Sie eine DynamoDB-Tabelle nach HDFS**
+  Verwenden Sie den folgenden Hive-Befehl. Dabei *hdfs:///directoryName* handelt es sich um einen gültigen HDFS-Pfad und *hiveTableName* um eine Tabelle in Hive, die auf DynamoDB verweist. Dieser Exportvorgang ist schneller als der Export einer DynamoDB-Tabelle nach Amazon S3, da Hive 0.7.1.1 als Zwischenschritt beim Exportieren von Daten nach Amazon S3 HDFS verwendet. Das folgende Beispiel zeigt, wie Sie `dynamodb.throughput.read.percent` auf 1.0 festlegen, um die Leseanforderungsrate zu erhöhen. 

  ```
  1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); 
  5.                     
  6. SET dynamodb.throughput.read.percent=1.0;                    
  7.                     
  8. INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;
  ```

   Sie können auch Daten nach HDFS via Formatierung und Kompression exportieren, wie oben beim Export nach Amazon S3 gezeigt. Ersetzen Sie dazu einfach das Amazon-S3-Verzeichnis in den Beispielen oben durch ein HDFS-Verzeichnis. <a name="EMR_Hive_non-printable-utf8"></a>

**So lesen Sie nicht druckbare UTF-8-Zeichendaten in Hive**
+ Sie können nicht druckbare UTF-8-Zeichendaten mit Hive lesen und schreiben, indem Sie beim Erstellen der Tabelle die `STORED AS SEQUENCEFILE`-Klausel verwenden. A SequenceFile ist das Hadoop-Binärdateiformat. Sie müssen Hadoop verwenden, um diese Datei zu lesen. Das folgende Beispiel zeigt, wie Daten aus DynamoDB nach Amazon S3 exportiert werden. Sie können diese Funktionalität für die Verarbeitung von nicht druckbaren UTF-8-kodierten Zeichen verwenden. 

  ```
   1. CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
   2. STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
   3. TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
   4. "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");                      
   5.                     
   6. CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>)
   7. STORED AS SEQUENCEFILE
   8. LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
   9.                     
  10. INSERT OVERWRITE TABLE s3_export SELECT * 
  11. FROM hiveTableName;
  ```

## Importieren von Daten in DynamoDB
<a name="EMR_Hive_Commands_importing"></a>

 Wenn Sie mit Hive Daten in DynamoDB schreiben möchten, stellen Sie sicher, dass die Anzahl der Schreibkapazitätseinheiten größer als die Anzahl der Mapper im Cluster ist. Zum Beispiel produzieren Cluster, die auf m1.xlarge-EC2-Instances ausgeführt werden, acht Mapper pro Instance. Bei einem Cluster mit 10 Instances ergäbe dies 80 Mapper. Wenn Ihre Schreibkapazitätseinheiten nicht größer als die Anzahl der Mapper im Cluster sind, verbraucht der Hive-Schreibvorgang möglicherweise den gesamten Schreibdurchsatz oder versucht, mehr Durchsatz zu verbrauchen, als zur Verfügung gestellt wird. Weitere Informationen zur Anzahl der Mapper, die von den einzelnen EC2-Instance-Typen produziert werden, finden Sie unter [Konfigurieren von Hadoop](emr-hadoop-config.md).

 Die Anzahl der Mapper in Hadoop wird durch die Input Splits gesteuert. Wenn es zu wenig Splits gibt, ist Ihr Schreibbefehl ggf. nicht in der Lage, den gesamten verfügbaren Schreibdurchsatz zu verbrauchen. 

 Wenn ein Element mit demselben Schlüssel in der DynamoDB-Zieltabelle vorhanden ist, wird es überschrieben. Wenn kein Element mit dem Schlüssel in der DynamoDB-Zieltabelle vorhanden ist, wird das Element eingefügt. 

**So importieren Sie Daten aus Amazon S3 zu DynamoDB**
+  Sie können Amazon EMR (Amazon EMR) und Hive verwenden, um Daten von Amazon S3 nach DynamoDB zu schreiben. 

  ```
  CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
  ```

**So importieren Sie eine Tabelle von einem Amazon-S3-Bucket in DynamoDB ohne Angabe einer Spaltenzuordnung**
+  Erstellen Sie eine `EXTERNAL`-Tabelle, die auf die in Amazon S3 gespeicherten Daten verweist, die zuvor von DynamoDB exportiert wurden. Stellen Sie vor dem Import sicher, dass die Tabelle in DynamoDB vorhanden ist und über das gleiche Schlüsselschema wie die zuvor exportierte DynamoDB-Tabelle verfügt. Die Tabelle muss außerdem über genau einen Spaltentyp `map<string, string>` verfügen. Wenn Sie dann eine Hive-Tabelle erstellen, die mit DynamoDB verknüpft ist, können Sie den Befehl `INSERT OVERWRITE` zum Schreiben der Daten von Amazon S3 nach DynamoDB aufrufen. Da es keine Spaltenzuordnung gibt, können Sie keine Tabellen abfragen, die auf diese Weise importiert werden. Das Importieren von Daten ohne Angabe einer Spaltenzuordnung ist ab Hive 0.8.1.5 verfügbar, was unter Amazon EMR AMI 2.2.3.x und höher unterstützt wird. 

  ```
  CREATE EXTERNAL TABLE s3TableName (item map<string, string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/'; 
                          
  CREATE EXTERNAL TABLE hiveTableName (item map<string,string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1");  
                   
  INSERT OVERWRITE TABLE hiveTableName SELECT * 
  FROM s3TableName;
  ```

**So importieren Sie eine Tabelle von HDFS in DynamoDB**
+  Sie können mit Amazon EMR und Hive Daten von HDFS nach DynamoDB schreiben. 

  ```
  CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 'hdfs:///directoryName';                    
                      
  CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>)
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
  TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", 
  "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays");  
                      
  INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;
  ```

## Abfragen von Daten in DynamoDB
<a name="EMR_Hive_Commands_querying"></a>

 Die folgenden Beispiele zeigen die verschiedenen Möglichkeiten zur Verwendung von Amazon EMR zum Abfragen von Daten in DynamoDB. 

**So finden Sie den größten Wert für eine zugeordnete Spalte (`max`)**
+  Verwenden Sie Hive-Befehle wie die folgenden. Im ersten Befehl wird durch die CREATE-Anweisung eine Hive-Tabelle erstellt, die auf in DynamoDB gespeicherte Daten verweist. Die SELECT-Anweisung verwendet dann diese Tabelle, um in DynamoDB gespeicherte Daten abzufragen. Im folgenden Beispiel wird nach der größten Bestellung eines bestimmten Kunden gesucht. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT max(total_cost) from hive_purchases where customerId = 717;
  ```

**So aggregieren Sie Daten mit der `GROUP BY`-Klausel**
+  Sie können die `GROUP BY`-Klausel zum Sammeln von Daten über mehrere Datensätze hinweg verwenden. Diese Klausel wird häufig in Verbindung mit einer Aggregationsfunktion wie „sum“, „count“, „min“ oder „max“ eingesetzt. Das folgende Beispiel gibt eine Liste der größten Bestellungen von Kunden zurück, die mehr als drei Bestellungen aufgegeben haben. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
  ```

**So verknüpfen Sie zwei DynamoDB-Tabellen**
+  Im folgenden Beispiel werden zwei Hive-Tabellen Daten in DynamoDB zugeordnet. Dann wird ein Join zwischen diesen beiden Tabellen aufgerufen. Der Join wird auf dem Cluster verarbeitet und zurückgegeben. Der Join wird nicht in DynamoDB ausgeführt. In diesem Beispiel wird eine Liste von Kunden mit ihren Einkäufen angezeigt, die mehr als zwei Bestellungen getätigt haben. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Customers",
  "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address");
  
  Select c.customerId, c.customerName, count(*) as count from hive_customers c 
  JOIN hive_purchases p ON c.customerId=p.customerId 
  GROUP BY c.customerId, c.customerName HAVING count > 2;
  ```

**So verknüpfen Sie zwei Tabellen aus verschiedenen Quellen**
+  Im folgenden Beispiel ist Customer\$1S3 eine Hive-Tabelle, die eine CSV-Datei lädt, die in Amazon S3 gespeichert ist, und hive\$1purchases ist eine Tabelle, die auf Daten in DynamoDB verweist. Das folgende Beispiel verbindet Kundendaten, die in Form einer CSV-Datei in Amazon S3 gespeichert sind, mit Daten in DynamoDB, um Daten für Bestellungen zurückzugeben, die von Kunden mit „Miller“ im Namen aufgegeben wurden. 

  ```
  CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) 
  STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES ("dynamodb.table.name" = "Purchases",
  "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items");
  
  CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
  LOCATION 's3://amzn-s3-demo-bucket/path/subpath/';
  
  Select c.customerId, c.customerName, c.customerAddress from 
  Customer_S3 c 
  JOIN hive_purchases p 
  ON c.customerid=p.customerid 
  where c.customerName like '%Miller%';
  ```

**Anmerkung**  
 In den vorherigen Beispielen wurden aus Gründen der Klarheit und Vollständigkeit die CREATE TABLE-Anweisungen in jedes Beispiel eingefügt. Bei Ausführung mehrerer Abfragen oder Exportoperationen für eine bestimmte Hive-Tabelle müssen Sie die Tabelle nur einmal zu Beginn der Hive-Sitzung erstellen. 

# Optimieren der Leistung von Amazon-EMR-Operationen in DynamoDB
<a name="EMR_Hive_Optimizing"></a>

 Amazon-EMR-Operationen auf eine DynamoDB-Tabelle zählen als Lesevorgänge und unterliegen den für die Tabelle angegebenen Durchsatzeinstellungen. Amazon EMR implementiert eine eigene Logik, um zu versuchen, die Belastung Ihrer DynamoDB-Tabelle auszugleichen und so die Möglichkeit der Überschreitung des bereitgestellten Durchsatzes zu minimieren. Am Ende jeder Hive-Abfrage gibt Amazon EMR Informationen über den für die Verarbeitung der Abfrage verwendeten Cluster zurück, zum Beispiel darüber, wie oft der bereitgestellte Durchsatz überschritten wurde. Sie können diese Informationen sowie CloudWatch Metriken zu Ihrem DynamoDB-Durchsatz verwenden, um die Belastung Ihrer DynamoDB-Tabelle bei nachfolgenden Anfragen besser zu verwalten. 

 Die folgenden Faktoren beeinflussen die Hive-Abfrageleistung bei der Arbeit mit DynamoDB-Tabellen. 

## Bereitgestellte Lesekapazitätseinheiten
<a name="ProvisionedReadCapacityUnits"></a>

 Wenn Sie Hive-Abfragen über eine DynamoDB-Tabelle ausführen, müssen Sie sicherstellen, dass ausreichend Lesekapazitätseinheiten vorhanden sind. 

 Nehmen Sie zum Beispiel an, dass Sie 100 Lesekapazitätseinheiten für Ihre DynamoDB-Tabelle bereitgestellt haben. Dadurch können Sie 100 Lesevorgänge oder 409.600 Byte pro Sekunde ausführen. Wenn diese Tabelle 20 GB an Daten enthält (21.474.836.480 Bytes) und Ihre Hive-Abfrage einen vollständigen Tabellen-Scan durchführt, können Sie ermitteln, wie lange die Ausführung der Abfrage dauern wird: 

 * 21 474 836 480/409 600 = 52 429 Sekunden = 14,56 Stunden * 

 Die einzige Möglichkeit, um die erforderliche Zeit zu verkürzen, würde eine Änderung der Lesekapazitätseinheiten in der DynamoDB-Quelltabelle notwendig machen. Das Hinzufügen weiterer Knoten zum Amazon-EMR-Cluster würde nicht helfen. 

 Der Fertigstellungsgrad in der Hive-Ausgabe wird dann aktualisiert, wenn ein oder mehrere Mapper-Prozesse abgeschlossen wurden. Für eine große DynamoDB-Tabelle mit wenig bereitgestellter Lesekapazität wird die fertiggestellte Ausgabe in Prozent ggf. für längere Zeit nicht aktualisiert; im oben genannten Fall würde der Auftrag mehrere Stunden lang zu 0 % abgeschlossen angezeigt. Weitere Details zum Fortschritt Ihres Auftrags finden Sie in der Amazon-EMR-Konsole. Hier können Sie den Status einzelner Mapper-Aufgaben sowie Statistiken für Lesevorgänge anzeigen. 

 Sie können sich auch bei der Hadoop-Schnittstelle auf dem Master-Knoten anmelden und die Hadoop-Statistiken anzeigen. Hier werden Ihnen der Status einzelner Map-Aufgaben sowie Statistiken für Lesevorgänge angezeigt. Weitere Informationen finden Sie unter [Auf Hauptknoten gehostete Web-Schnittstellen](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html) im *Verwaltungshandbuch für Amazon EMR*.

## Einstellung der Leserate in Prozent
<a name="ReadPercent"></a>

 Standardmäßig verwaltet Amazon EMR die Anforderungslast für Ihre DynamoDB-Tabelle entsprechend Ihres aktuell bereitgestellten Durchsatzes. Wenn Amazon EMR jedoch Informationen zu Ihrem Auftrag zurückgibt, die eine große Anzahl an Antworten enthalten, dass der bereitgestellte Durchsatz überschritten wurde, können Sie die Standardleserate bei Einrichtung der Hive-Tabelle mit dem Parameter `dynamodb.throughput.read.percent` anpassen. Weitere Informationen zum Festlegen des Parameters zur Leserate in Prozent finden Sie unter [Hive-Optionen](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Einstellung der Schreibrate in Prozent
<a name="WritePercent"></a>

 Standardmäßig verwaltet Amazon EMR die Anforderungslast für Ihre DynamoDB-Tabelle entsprechend Ihres aktuell bereitgestellten Durchsatzes. Wenn Amazon EMR jedoch Informationen zu Ihrem Auftrag zurückgibt, die eine große Anzahl an Antworten enthalten, dass der bereitgestellte Durchsatz überschritten wurde, können Sie die Standardschreibrate bei Einrichtung der Hive-Tabelle mit dem Parameter `dynamodb.throughput.write.percent` anpassen. Weitere Informationen zum Festlegen des Parameters zur Schreibrate in Prozent finden Sie unter [Hive-Optionen](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Einstellung für das Wiederholungsintervall
<a name="emr-ddb-retry-duration"></a>

 Standardmäßig führt Amazon EMR eine Hive-Abfrage erneut aus, wenn nicht innerhalb von zwei Minuten (dem Standard-Wiederholungsintervall) ein Ergebnis zurückgegeben wurde. Sie können dieses Intervall anpassen, indem Sie bei der Ausführung einer Hive-Abfrage den Parameter `dynamodb.retry.duration` einstellen. Weitere Informationen zum Festlegen des Parameters zur Schreibrate in Prozent finden Sie unter [Hive-Optionen](EMR_Interactive_Hive.md#EMR_Hive_Options). 

## Anzahl der Zuordnungs-Tasks
<a name="NumberMapTasks"></a>

 Die Zuordnungs-Daemons, die Hadoop startet, um Ihre Anforderungen zum Exportieren und Abfragen von in DynamoDB gespeicherten Daten zu verarbeiten, sind auf eine maximale Leserate von 1 MiB pro Sekunde begrenzt, um die verbrauchte Lesekapazität einzuschränken. Wenn zusätzlich bereitgestellter Durchsatz auf DynamoDB vorhanden ist, verbessern Sie die Performance von Hive-Export- und Abfrageoperationen, indem Sie die Anzahl der Zuordnungs-Daemons erhöhen. Zu diesem Zweck können Sie entweder die Anzahl der EC2-Instances in Ihrem Cluster *oder* die Anzahl der Zuordnungs-Daemons erhöhen, die auf jeder EC2-Instance ausgeführt werden. 

 Sie können die Anzahl der EC2-Instances in einem Cluster erhöhen, indem Sie den aktuellen Cluster beenden und mit einer größeren Anzahl von EC2-Instances erneut starten. Sie geben die Anzahl der EC2-Instances im Dialogfeld **Configure EC2 Instances** an, wenn Sie den Cluster von der Amazon-EMR-Konsole starten, oder mit der Option `‑‑num-instances`, wenn Sie den Cluster von der Befehlszeilenschnittstelle starten. 

 Die Anzahl der Map-Tasks auf einer Instance hängt vom EC2-Instance-Typ ab. Weitere Informationen zu den unterstützten EC2 Instance-Typen und der Anzahl der von ihnen jeweils bereitgestellten Mapper finden Sie in der [Aufgabenkonfiguration](emr-hadoop-task-config.md). Hier finden Sie einen Abschnitt zur Aufgabenkonfiguration ("Task Configuration") für jede der unterstützten Konfigurationen. 

 Eine andere Möglichkeit, die Anzahl der Mapper-Daemons zu erhöhen, besteht darin, den Hadoop-Konfigurationsparameter `mapreduce.tasktracker.map.tasks.maximum` in einen höheren Wert zu ändern. Dies hat den Vorteil, dass Sie mehr Mapper erhalten, ohne die Anzahl oder die Größe der EC2-Instances erhöhen zu müssen, wodurch Sie Kosten sparen. Ein Nachteil ist, dass wenn Sie diesen Wert zu hoch einstellen, für die EC2-Instances in Ihrem Cluster nicht mehr genügend Arbeitsspeicher vorhanden ist. Zum Festlegen von `mapreduce.tasktracker.map.tasks.maximum` starten Sie den Cluster und geben einen Wert für `mapreduce.tasktracker.map.tasks.maximum` als Eigenschaft der Konfigurationsklassifizierung der mapred-site an. Dies wird im folgenden Beispiel veranschaulicht. Weitere Informationen finden Sie unter [Anwendungen konfigurieren](emr-configure-apps.md).

```
{
    "configurations": [
    {
        "classification": "mapred-site",
        "properties": {
            "mapred.tasktracker.map.tasks.maximum": "10"
        }
    }
    ]
}
```

## Parallele Datenanforderungen
<a name="ParallelDataRequests"></a>

 Mehrere Datenanforderungen, entweder von mehr als einem Benutzer oder von mehr als einer Anwendung, für eine einzelne Tabelle kann den bereitgestellten Lesedurchsatz erschöpfen und die Performance beeinträchtigen. 

## Prozessdauer
<a name="ProcessDuration"></a>

 Die Datenkonsistenz in DynamoDB hängt von der Reihenfolge der Lese- und Schreibvorgänge auf den einzelnen Knoten ab. Während eine Hive-Abfrage verarbeitet wird, kann eine andere Anwendung neue Daten in die DynamoDB-Tabelle laden oder vorhandene Daten ändern oder löschen. In diesem Fall enthalten die Ergebnisse der Hive-Abfrage möglicherweise nicht die Datenänderungen, die vorgenommen wurden, während die Abfrage ausgeführt wurde. 

## Durchsatzüberschreitungen vermeiden
<a name="AvoidExceedingThroughput"></a>

 Wenn Sie Hive-Abfragen für DynamoDB ausführen, achten Sie darauf, dass Ihr bereitgestellter Durchsatz nicht überschritten wird, da sich dies nachteilig auf die Kapazität auswirkt, die für die `DynamoDB::Get`-Aufrufe Ihrer Anwendung benötigt werden. Um sicherzustellen, dass dies nicht der Fall ist, sollten Sie das Lesevolumen und die Drosselung bei Anwendungsaufrufen regelmäßig überwachen, `DynamoDB::Get` indem Sie die Protokolle und Messwerte in Amazon überprüfen. CloudWatch 

## Abfragezeit
<a name="RequestTime"></a>

 Wenn Hive-Abfragen, die auf eine DynamoDB-Tabelle zugreifen, für Zeiten geplant werden, in denen wenig Anforderungen an die DynamoDB-Tabelle gerichtet werden, verbessert das die Leistung. Beispiel: Wenn die Mehrzahl der Anwendungsbenutzer in San Francisco lebt, können Sie die täglichen Daten um 04:00 Uhr PST, wenn die meisten Benutzer schlafen und keine Datensätze in der DynamoDB-Datenbank aktualisiert werden. 

## Zeitbasierte Tabellen
<a name="TimeBasedTables"></a>

 Wenn die Daten als eine Reihe von zeitbasierten DynamoDB-Tabellen organisiert sind, z. B. eine Tabelle pro Tag, können Sie die Daten exportieren, wenn die Tabelle nicht mehr aktiv ist. Sie können auf diese Weise Daten fortlaufend nach Amazon S3 sichern. 

## Archivierte Daten
<a name="ArchivedData"></a>

 Wenn Sie vorhaben, viele Hive-Abfragen für in DynamoDB gespeicherte Daten auszuführen und Ihre Anwendung archivierte Daten unterstützt, können Sie die Daten nach HDFS oder Amazon S3 exportieren und die Hive-Abfragen auf eine Kopie der Daten anstelle von DynamoDB ausführen. Dies spart Lesevorgänge und schont den bereitgestellten Durchsatz. 

# Kinesis
<a name="emr-kinesis"></a>

Amazon EMR-Cluster können Amazon Kinesis Kinesis-Streams direkt lesen und verarbeiten, indem sie vertraute Tools aus dem Hadoop-Ökosystem wie Hive, Pig MapReduce, die Hadoop Streaming API und Cascading verwenden. Außerdem können Sie Echtzeitdaten aus Amazon Kinesis mit vorhandenen Daten auf Amazon S3, Amazon DynamoDB und HDFS in einem aktiven Cluster zusammenführen. Zur Nachbearbeitung können Sie die Daten aus Amazon EMR direkt in Amazon S3 oder DynamoDB laden. Informationen zu den wichtigsten Services und Preisen von Amazon Kinesis finden Sie auf der Seite zu [Amazon Kinesis](https://aws.amazon.com//kinesis).

## Wie kann ich die Integration von Amazon EMR und Amazon Kinesis verwenden?
<a name="kinesis-use-cases"></a>

 Die Integration von Amazon EMR in Amazon Kinesis erleichtert bestimmte Szenarien, z. B.: 
+ **Streaming-Protokollanalyse ** – Sie können Streaming-Webprotokolle analysieren, um eine Liste der 10 häufigsten Fehlertypen alle „x“ Minuten nach Region, Browser und Zugriffs-Domain zu erzeugen. 
+ **Kundenbindung** – Sie können Abfragen schreiben, die Clickstream-Daten aus Amazon Kinesis mit Informationen zu Werbekampagnen in einer DynamoDB-Tabelle verknüpfen, um die effektivsten Kategorien von Anzeigen zu bestimmen, die auf bestimmten Websites angezeigt werden. 
+ **Interaktive Ad-hoc-Abfragen** – Sie können Daten regelmäßig aus Amazon-Kinesis-Streams in HDFS laden und diese für schnelle, interaktive Analyseabfragen als lokale Impala-Tabelle zur Verfügung stellen.

## Analyse von Amazon-Kinesis-Streams mit Checkpointing
<a name="kinesis-checkpoint"></a>

Benutzer können regelmäßige, auf Stapelverarbeitung basierende Analysen von Amazon-Kinesis-Streams in so genannten *Iterationen* ausführen. Da Amazon-Kinesis-Stream-Datensätze mithilfe von Sequenznummern abgerufen werden, erfolgt die Festlegung der Iterationsgrenzen anhand der Start- und Endsequenznummern, die von Amazon EMR in einer DynamoDB-Tabelle gespeichert werden. Wenn z. B. `iteration0` endet, wird die letzte Sequenznummer in DynamoDB gespeichert, sodass bei Beginn des Auftrags `iteration1` nachfolgende Daten aus dem Stream abgerufen werden können. Diese Zuweisung von Iterationen in Stream-Daten wird als *Checkpointing* bezeichnet. Weitere Informationen finden Sie unter [Kinesis-Konnektor](https://aws.amazon.com/elasticmapreduce/faqs/#kinesis-connector).

Wenn eine Iteration mit einem Checkpoint versehen wurde und der Auftrag eine Iteration nicht verarbeiten konnte, versucht Amazon EMR, die Datensätze in dieser Iteration erneut zu verarbeiten. 

Checkpointing ist eine Funktion, mit der Sie Folgendes durchführen können: 
+ Starten von Datenverarbeitung nach einer Sequenznummer, die von eine vorherigen Abfrage verarbeitet wurde, die im selben Stream und mit selben logischen Namen ausgeführt wurde
+ Erneutes Verarbeiten desselben Datenstapels aus Kinesis, der von einer früheren Abfrage verarbeitet wurde

 Setzen Sie in Ihren Skripts den Parameter `kinesis.checkpoint.enabled` auf `true`, um Checkpointing zu aktivieren. Konfigurieren Sie außerdem die folgenden Parameter:


| Konfigurationseinstellung | Description | 
| --- | --- | 
| kinesis.checkpoint.metastore.table.name | Name der DynamoDB-Tabelle, in der Checkpoint-Informationen gespeichert werden | 
| kinesis.checkpoint.metastore.hash.key.name | Name des Hash-Schlüssels für die DynamoDB-Tabelle | 
| kinesis.checkpoint.metastore.hash.range.name | Name des Bereichsschlüssels für die DynamoDB-Tabelle | 
| kinesis.checkpoint.logical.name | Ein logischer Name für die aktuelle Verarbeitung | 
| kinesis.checkpoint.iteration.no | Iterationsnummer für die Verarbeitung im Zusammenhang mit dem logischen Namen | 
| kinesis.rerun.iteration.without.wait | Boolescher Wert, der angibt, ob eine fehlgeschlagene Iteration erneut ausgeführt werden kann, ohne die Zeitbeschränkung abzuwarten. Der Standardwert ist false | 

### Empfehlungen zu bereitgestellten IOPS für Amazon-DynamoDB-Tabellen
<a name="kinesis-checkpoint-DDB"></a>

Der Amazon-EMR-Konnektor für Amazon Kinesis verwendet die DynamoDB-Datenbank als Sicherungsdatenbank für das Metadaten-Checkpointing. Sie müssen in DynamoDB eine Tabelle erstellen, bevor Sie Daten in einem Amazon-Kinesis-Stream mit einem Amazon-EMR-Cluster in Checkpoint-Intervallen verwenden können. Die Tabelle muss sich in derselben Region befinden wie Ihr Amazon-EMR-Cluster. Im Folgenden erhalten Sie allgemeine Empfehlungen für die Anzahl von IOPS, die Sie für Ihre DynamoDB-Tabellen bereitstellen sollten. Dabei sei `j` die maximale Anzahl der Hadoop-Aufträge (mit unterschiedlichen Kombinationen für logischer Name\$1Iterationsnummer), die gleichzeitig ausgeführt werden können, und `s` die maximale Anzahl der Shards, die ein Auftrag verarbeitet:

Für **Read Capacity Units (Lese-Kapazitätseinheiten)**: `j`\$1`s`/`5`

Für **Write Capacity Units (Schreibkapazitätseinheiten)**: `j`\$1`s`

## Leistungsaspekte
<a name="performance"></a>

Der Shard-Durchsatz für Amazon Kinesis ist direkt proportional zur Instance-Größe von Knoten in Amazon-EMR-Clustern und zur Datensatzgröße im Stream. Wir empfehlen die Verwendung von m5.xlarge oder größeren Instances für Haupt- und Core-Knoten.

## Die Amazon-Kinesis-Analyse mit Amazon EMR planen
<a name="schedule"></a>

Wenn Sie Daten in einem aktiven Amazon-Kinesis-Stream analysieren und dabei durch Timeouts und eine maximale Dauer der einzelnen Iterationen eingeschränkt sind, müssen Sie die Analyse unbedingt häufiger durchführen, um regelmäßig Details aus dem Stream zu erhalten. Es gibt verschiedene Möglichkeiten, solche Skripte und Abfragen in regelmäßigen Intervallen durchzuführen. Wir empfehlen für wiederkehrende Aufgaben wie diese AWS Data Pipeline . *Weitere Informationen finden Sie unter und im Developer Guide. [AWS Data Pipeline PigActivity[AWS Data Pipeline HiveActivity](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html)AWS Data Pipeline](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-pigactivity.html)*

# Migration des Spark-Kinesis-Konnektors zu SDK 2.x für Amazon EMR 7.0
<a name="migrating-spark-kinesis"></a>

Das AWS SDK bietet eine Vielzahl von Bibliotheken für die Interaktion mit AWS Cloud-Computing-Diensten, z. B. die Verwaltung von Anmeldeinformationen APIs und die Verbindung zu S3- und Kinesis-Diensten. Der Spark-Kinesis-Konnektor wird verwendet, um Daten aus Kinesis Data Streams zu verarbeiten, und die empfangenen Daten werden in der Ausführungs-Engine von Spark transformiert und verarbeitet. Derzeit baut dieser Konnektor auf Version 1.x von AWS SDK und Kinesis-client-library (KCL) auf. 

Im Rahmen der AWS SDK 2.x-Migration wird auch der Spark Kinesis-Connector entsprechend aktualisiert, sodass er mit dem SDK 2.x ausgeführt werden kann. In der Amazon-EMR-Version 7.0 enthält Spark das SDK-2.x-Upgrade, das in der Community-Version von Apache Spark noch nicht verfügbar ist. Wenn Sie den Spark-Kinesis-Konnektor aus einer Version unter 7.0 verwenden, müssen Sie Ihre Anwendungscodes für die Ausführung in SDK 2.x migrieren, bevor Sie zu Amazon EMR 7.0 migrieren können.

## Migrationshandbücher
<a name="migrating-spark-kinesis-migration-guides"></a>

In diesem Abschnitt werden die Schritte zur Migration einer Anwendung zum aktualisierten Spark-Kinesis-Konnektor beschrieben. Es enthält Anleitungen für die Migration zur Kinesis Client Library (KCL) 2.x, AWS Anmeldeinformationsanbieter und AWS Service-Clients in SDK 2.x. AWS Als Referenz enthält es auch ein [WordCount](https://github.com/apache/spark/blob/v3.5.0/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala)Beispielprogramm, das den Kinesis-Konnektor verwendet.

**Topics**
+ [KCL von 1.x zu 2.x migrieren](#migrating-spark-kinesis-KCL-from-1.x-to-2.x)
+ [Anbieter von AWS Anmeldeinformationen von AWS SDK 1.x auf 2.x migrieren](#migrating-spark-kinesis-creds-from-1.x-to-2.x)
+ [Migration von AWS Service-Clients von AWS SDK 1.x auf 2.x](#migrating-spark-kinesis-service-from-1.x-to-2.x)
+ [Codebeispiele für Streaming-Anwendungen](#migrating-spark-kinesis-streaming-examples)
+ [Überlegungen zur Verwendung des aktualisierten Spark-Kinesis-Konnektors](#migrating-spark-kinesis-considerations)

### KCL von 1.x zu 2.x migrieren
<a name="migrating-spark-kinesis-KCL-from-1.x-to-2.x"></a>
+ **Ebene und Dimensionen der Metriken in `KinesisInputDStream`**

  Wenn Sie einen `KinesisInputDStream` instanziieren, können Sie die Metrikebene und die Dimensionen für den Stream steuern. Das folgende Beispiel zeigt, wie Sie diese Parameter mit KCL 1.x anpassen können:

  ```
  import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
  import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
    .build()
  ```

  In KCL 2.x haben diese Konfigurationseinstellungen andere Paketnamen. Für die Migration zu 2.x:

  1. Ändern Sie die Importanweisungen für `com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration` und `com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel` zu `software.amazon.kinesis.metrics.MetricsLevel` bzw. `software.amazon.kinesis.metrics.MetricsUtil`.

     ```
     // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
     import software.amazon.kinesis.metrics.MetricsLevel
      
     // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import software.amazon.kinesis.metrics.MetricsUtil
     ```

  1. Ersetzen Sie die Zeile `metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet` durch `metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)`

  Im Folgenden finden Sie eine aktualisierte Version von `KinesisInputDStream` mit benutzerdefinierten Metrikebene und Metrikdimensionen:

  ```
  import software.amazon.kinesis.metrics.MetricsLevel
  import software.amazon.kinesis.metrics.MetricsUtil
   
  val kinesisStream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .build()
  ```
+ Meldungshandler-Funktion in `KinesisInputDStream`

  Bei der Instanziierung eines `KinesisInputDStream` können Sie auch eine „Meldungshandler-Funktion“ angeben, die einen Kinesis-Datensatz verwendet und ein generisches Objekt T zurückgibt, falls Sie andere in einem Datensatz enthaltene Daten wie den Partitionsschlüssel verwenden möchten.

  In KCL 1.x lautet die Signatur der Meldungshandler-Funktion: `Record => T`, wobei Record für `com.amazonaws.services.kinesis.model.Record` steht. In KCL 2.x wurde die Signatur des Handlers in:`KinesisClientRecord => T`, where is geändert. KinesisClientRecord `software.amazon.kinesis.retrieval.KinesisClientRecord` 

  Es folgt ein Beispiel für die Bereitstellung eines Meldungshandlers in KCL 1.x:

  ```
  import com.amazonaws.services.kinesis.model.Record
   
   
  def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Für die Migration des Meldungshandlers:

  1. Ändern Sie die Importanweisung für `com.amazonaws.services.kinesis.model.Record` zu `software.amazon.kinesis.retrieval.KinesisClientRecord`.

     ```
     // import com.amazonaws.services.kinesis.model.Record
     import software.amazon.kinesis.retrieval.KinesisClientRecord
     ```

  1. Aktualisieren Sie die Methodensignatur des Meldungshandlers.

     ```
     //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
     def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
     ```

  Es folgt ein aktualisiertes Beispiel für die Bereitstellung eines Meldungshandlers in KCL 2.x:

  ```
  import software.amazon.kinesis.retrieval.KinesisClientRecord
   
   
  def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
  val stream = KinesisInputDStream.builder
    .streamingContext(ssc)
    .streamName(streamName)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPosition(new Latest())
    .checkpointAppName(appName)
    .checkpointInterval(Seconds(10))
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(addFive)
  ```

  Weitere Informationen zur Migration von KCL 1.x zu 2.x finden Sie unter [Migrieren von Verbrauchern von KCL 1.x zu KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html).

### Anbieter von AWS Anmeldeinformationen von AWS SDK 1.x auf 2.x migrieren
<a name="migrating-spark-kinesis-creds-from-1.x-to-2.x"></a>

Anbieter von Anmeldeinformationen werden verwendet, um AWS Anmeldeinformationen für Interaktionen mit zu erhalten. AWS In SDK 2.x gibt es mehrere Schnittstellen- und Klassenänderungen im Zusammenhang mit den Anbietern von Anmeldeinformationen, die Sie [hier finden](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#122-client-credentials). Der Spark Kinesis-Konnektor hat eine Schnittstelle (`org.apache.spark.streaming.kinesis.SparkAWSCredentials`) und Implementierungsklassen definiert, die die Version 1.x von AWS Credential Providern zurückgeben. Diese Anbieter von Anmeldeinformationen werden bei der Initialisierung von Kinesis-Clients benötigt. Wenn Sie die Methode beispielsweise `SparkAWSCredentials.provider` in den Anwendungen verwenden, müssten Sie die Codes aktualisieren, um die 2.x-Version der Credential Provider nutzen zu können. AWS 

Im Folgenden finden Sie ein Beispiel für die Verwendung der Anmeldeinformationsanbieter in AWS SDK 1.x:

```
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
 
val basicSparkCredentials = SparkAWSCredentials.builder
    .basicCredentials("accessKey", "secretKey")
    .build()
                                     
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
```

**Für die Migration zu 2.x:**

1. Ändern Sie die Importanweisung für `com.amazonaws.auth.AWSCredentialsProvider` zu `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`

   ```
   //import com.amazonaws.auth.AWSCredentialsProvider
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
   ```

1. Aktualisieren Sie die verbleibenden Codes, die diese Klasse verwenden. 

   ```
   import org.apache.spark.streaming.kinesis.SparkAWSCredentials
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    
   val basicSparkCredentials = SparkAWSCredentials.builder
       .basicCredentials("accessKey", "secretKey")
       .build()
                                             
   val credentialProvider = basicSparkCredentials.provider
   assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
   ```

### Migration von AWS Service-Clients von AWS SDK 1.x auf 2.x
<a name="migrating-spark-kinesis-service-from-1.x-to-2.x"></a>

AWS Service-Clients haben in 2.x unterschiedliche Paketnamen (d. h.`software.amazon.awssdk`), wohingegen das SDK 1.x verwendet. `com.amazonaws` Weitere Informationen über die Änderungen in dieser Version finden Sie [hier](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html). Wenn Sie diese Service-Clients in den Codes verwenden, müssten Sie die Clients entsprechend migrieren.

Im Folgenden finden Sie ein Beispiel für die Erstellung eines Clients in SDK 1.x:

```
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
 
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
```

**Für die Migration zu 2.x:**

1. Ändern Sie die Importanweisungen für Service-Clients. Nehmen wir als Beispiel DynamoDB-Clients. Sie müssten `com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient` oder `com.amazonaws.services.dynamodbv2.document.DynamoDB` zu `software.amazon.awssdk.services.dynamodb.DynamoDbClient` ändern.

   ```
   // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
   // import com.amazonaws.services.dynamodbv2.document.DynamoDB
   import software.amazon.awssdk.services.dynamodb.DynamoDbClient
   ```

1. Die Codes aktualisieren, die die Clients initialisieren

   ```
   // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
   // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
    
   DynamoDbClient ddbClient = DynamoDbClient.create();
   DynamoDbClient ddbClient = DynamoDbClient.builder().build();
   ```

   Weitere Informationen zur Migration des AWS SDK von 1.x auf 2.x finden Sie unter [Was ist der Unterschied zwischen dem AWS SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-whats-different.html) 1.x und 2.x

### Codebeispiele für Streaming-Anwendungen
<a name="migrating-spark-kinesis-streaming-examples"></a>

```
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
 
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
object KinesisWordCountASLSDKV2 {
 
  def main(args: Array[String]): Unit = {
    val appName = "demo-app"
    val streamName = "demo-kinesis-test"
    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val regionName = "us-west-2"
 
    // Determine the number of shards from the stream using the low-level Kinesis Client
    // from the AWS Java SDK.
    val credentialsProvider = DefaultCredentialsProvider.create
    require(credentialsProvider.resolveCredentials() != null,
      "No AWS credentials found. Please specify credentials using one of the methods specified " +
        "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
    val kinesisClient = KinesisClient.builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.US_WEST_2)
      .endpointOverride(URI.create(endpointUrl))
      .httpClientBuilder(ApacheHttpClient.builder())
      .build()
    val describeStreamRequest = DescribeStreamRequest.builder()
      .streamName(streamName)
      .build()
    val numShards = kinesisClient.describeStream(describeStreamRequest)
      .streamDescription
      .shards
      .size
 
 
    // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
    // This is not a necessity; if there are less receivers/DStreams than the number of shards,
    // then the shards will be automatically distributed among the receivers and each receiver
    // will receive data from multiple shards.
    val numStreams = numShards
 
    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
 
    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval
 
    // Setup the SparkConfig and StreamingContext
    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
    val ssc = new StreamingContext(sparkConfig, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .metricsLevel(MetricsLevel.DETAILED)
        .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
        .build()
    }
 
    // Union all the streams
    val unionStreams = ssc.union(kinesisStreams)
 
    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
 
    // Map each word to a (word, 1) tuple so we can reduce by key to count the words
    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
 
    // Print the first 10 wordCounts
    wordCounts.print()
 
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }
}
```

### Überlegungen zur Verwendung des aktualisierten Spark-Kinesis-Konnektors
<a name="migrating-spark-kinesis-considerations"></a>
+ Wenn Ihre Anwendungen `Kinesis-producer-library` mit einer JDK-Version unter 11 verwenden, kann es zu Ausnahmen wie `java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter` kommen. Dies liegt daran, dass EMR 7.0 standardmäßig mit JDK 17 geliefert wird und J2EE-Module seit Java 11\$1 aus den Standardbibliotheken entfernt wurden. Dies könnte behoben werden, indem die folgende Abhängigkeit zur POM-Datei hinzugefügt wird. Ersetzen Sie die Bibliotheksversion nach Bedarf durch eine passende.

  ```
  <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>${jaxb-api.version}</version>
      </dependency>
  ```
+ Das Spark-Kinesis-Konnektor-JAR befindet sich nach der Erstellung eines EMR-Clusters unter diesem Pfad: `/usr/lib/spark/connector/lib/`

# DistCp S3 (s3-dist-cp)
<a name="UsingEMR_s3distcp"></a>

Apache DistCp ist ein Open-Source-Tool, mit dem Sie große Datenmengen kopieren können. *S3 DistCp* ähnelt Amazon S3 DistCp, ist aber für die Verwendung mit AWS Amazon S3 optimiert. Der Befehl für S3 DistCp in Amazon EMR Version 4.0 und höher lautet`s3-dist-cp`, den Sie als Schritt in einem Cluster oder in der Befehlszeile hinzufügen. Mit S3 DistCp können Sie große Datenmengen effizient von Amazon S3 nach HDFS kopieren, wo sie in nachfolgenden Schritten in Ihrem Amazon EMR-Cluster verarbeitet werden können. Sie können S3 auch verwendenDistCp , um Daten zwischen Amazon S3-Buckets oder von HDFS nach Amazon S3 zu kopieren. S3 DistCp ist skalierbarer und effizienter für das parallel Kopieren einer großen Anzahl von Objekten zwischen Buckets und AWS Konten.

Spezifische Befehle, die die Flexibilität von S3DistCP in realen Szenarien demonstrieren, finden Sie im Big Data-Blog unter [Sieben Tipps zur Verwendung von S3 DistCp](https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/). AWS 

Wie bei DistCp S3 DistCp wird häufig MapReduce auf verteilte Weise kopiert. Kopier-, Fehlerbehandlungs-, Wiederherstellungs- und Berichterstellungsaufgaben werden auf mehrere Server verteilt. Weitere Informationen zum DistCp Open-Source-Projekt Apache finden Sie in der [DistCpAnleitung](http://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html) in der Apache Hadoop-Dokumentation.

Wenn S3 DistCp einige oder alle der angegebenen Dateien nicht kopieren kann, schlägt der Cluster-Schritt fehl und gibt einen Fehlercode ungleich Null zurück. In diesem Fall bereinigt S3 teilweise kopierte Dateien DistCp nicht. 

**Wichtig**  
S3 unterstützt DistCp keine Amazon S3 S3-Bucket-Namen, die den Unterstrich enthalten.  
S3 unterstützt DistCp keine Verkettung von Parquet-Dateien. Verwenden Sie stattdessen. PySpark Weitere Informationen finden Sie unter [Parquet-Dateien verketten in Amazon EMR](https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/).  
Um Kopierfehler zu vermeiden, wenn Sie S3DistCP verwenden, um eine einzelne Datei (anstelle eines Verzeichnisses) von S3 nach HDFS zu kopieren, verwenden Sie Amazon-EMR-Version 5.33.0 oder höher oder Amazon-EMR-Version 6.3.0 oder höher.

## DistCp S3-Optionen
<a name="UsingEMR_s3distcp.options"></a>

S3 ist zwar ähnlich DistCp, DistCp unterstützt aber verschiedene Optionen, mit denen Sie die Art und Weise ändern können, wie Daten kopiert und komprimiert werden.

Wenn Sie S3 aufrufenDistCp, können Sie die in der folgenden Tabelle beschriebenen Optionen angeben. Die Optionen werden dem Schritt mithilfe der Liste der Argumente hinzugefügt. Beispiele für die DistCp S3-Argumente sind in der folgenden Tabelle aufgeführt. 


| Option  | Description  | Erforderlich  | 
| --- | --- | --- | 
| ‑‑src=LOCATION  |  Der Speicherort der zu kopierenden Daten. Hierbei kann es sich entweder um einen HDFS- oder Amazon-S3-Speicherort handeln.  Beispiel: `‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node`   S3 unterstützt DistCp keine Amazon S3 S3-Bucket-Namen, die den Unterstrich enthalten.   | Ja  | 
| ‑‑dest=LOCATION  |  Das Ziel für die Daten. Hierbei kann es sich entweder um einen HDFS- oder Amazon-S3-Speicherort handeln.  Beispiel: `‑‑dest=hdfs:///output`   S3 unterstützt DistCp keine Amazon S3 S3-Bucket-Namen, die den Unterstrich enthalten.   | Ja  | 
| ‑‑srcPattern=PATTERN  |  Ein [regulärer Ausdruck](http://en.wikipedia.org/wiki/Regular_expression), der die Kopieroperation auf eine Teilmenge der Daten unter `‑‑src` filtert. Wenn weder `‑‑srcPattern` noch `‑‑groupBy` angegeben ist, werden alle Daten unter `‑‑src` in `‑‑dest` kopiert.  Wenn das Argument des regulären Ausdrucks Sonderzeichen wie Sternchen (\$1) enthält, muss entweder der reguläre Ausdruck oder die gesamte `‑‑args`-Zeichenfolge in einfache Anführungszeichen (') gesetzt werden.  Beispiel: `‑‑srcPattern=.*daemons.*-hadoop-.*`   | Nein  | 
| ‑‑groupBy=PATTERN  |  Ein [regulärer Ausdruck](http://en.wikipedia.org/wiki/Regular_expression), der S3 veranlasst, Dateien DistCp zu verketten, die dem Ausdruck entsprechen. Sie können mit dieser Option beispielsweise alle Protokolldateien kombinieren, die innerhalb einer Stunde in einer einzigen Datei geschrieben wurden. Der verkettete Dateiname ist der Wert, der dem regulären Ausdruck für die Gruppe entspricht.  Klammern geben an, wie Dateien gruppiert werden soll, wobei alle Elemente, die der eingeklammerten Anweisung entsprechen, zu einer einzigen Ausgabedatei kombiniert werden. Wenn der reguläre Ausdruck keine Anweisung in Klammern enthält, schlägt der Cluster beim Schritt S3 DistCp fehl und gibt einen Fehler zurück.  Wenn das Argument des regulären Ausdrucks Sonderzeichen wie Sternchen (\$1) enthält, muss entweder der reguläre Ausdruck oder die gesamte `‑‑args`-Zeichenfolge in einfache Anführungszeichen (') gesetzt werden.  Wenn `‑‑groupBy` angegeben ist, werden nur Dateien kopiert, die dem angegebenen Muster entsprechen. Sie müssen `‑‑groupBy` und `‑‑srcPattern` nicht gleichzeitig angeben.  Beispiel: `‑‑groupBy=.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*`  | Nein  | 
| ‑‑targetSize=SIZE  |  Die Größe (in Mebibyte (MiB)) der Dateien, die basierend auf der `‑‑groupBy`-Option erstellt werden sollen. Dieser Wert muss eine Ganzzahl sein. Wenn dieser `‑‑targetSize` Wert gesetzt ist, DistCp versucht S3, dieser Größe zu entsprechen. Die tatsächliche Größe der kopierten Dateien kann größer oder kleiner als dieser Wert sein. Aufträge werden basierend auf der Größe der Datendatei aggregiert. Deshalb ist es möglich, dass die Zieldateigröße der Quelldateigröße entspricht.  Wenn die durch `‑‑groupBy` verketteten Dateien größer als der Wert `‑‑targetSize` sind, werden sie in Teildateien aufgelöst und sequenziell mit einem an das Ende angefügten numerischen Wert benannt. Eine Datei, die beispielsweise zu `myfile.gz` verkettet ist, wird wie folgt in mehrere Teile unterteilt: `myfile0.gz`, `myfile1.gz` usw.  Beispiel: `‑‑targetSize=2`   | Nein  | 
| ‑‑appendToLastFile |  Gibt das Verhalten von S3 DistCp beim Kopieren von Dateien von Amazon S3 nach HDFS an, die bereits vorhanden sind. Es werden neue Dateidaten an vorhandene Dateien angefügt. Wenn Sie `‑‑appendToLastFile` mit `‑‑groupBy` verwenden, werden neue Daten an Dateien angefügt, die denselben Gruppen entsprechen. Diese Option respektiert auch das `‑‑targetSize`-Verhalten, wenn sie mit `‑‑groupBy.` verwendet wird.  | Nein  | 
| ‑‑outputCodec=CODEC  |  Gibt den Komprimierungs-Codec für die kopierten Dateien an. Mögliche Werte sind: `gzip`, `gz`, `lzo`, `snappy` oder `none`. Sie können diese Option beispielsweise verwenden, um Eingabedateien, die mit Gzip komprimiert sind, in Ausgabedateien mit LZO-Komprimierung zu konvertieren oder um die Dateien im Rahmen der Kopieroperation zu dekomprimieren. Wenn Sie einen Ausgabe-Codec auswählen, wird dem Dateinamen die entsprechende Erweiterung angefügt (z. B. für `gz` und `gzip` die Erweiterung `.gz`). Wenn Sie keinen Wert für `‑‑outputCodec` angeben, werden die Dateien ohne Änderung der Komprimierung kopiert.  Beispiel: `‑‑outputCodec=lzo`   | Nein  | 
| ‑‑s3ServerSideEncryption  |  Stellt sicher, dass die Zieldaten mit SSL übertragen und in Amazon S3 automatisch mit einem AWS serviceseitigen Schlüssel verschlüsselt werden. Beim Abrufen von Daten mit S3 DistCp werden die Objekte automatisch entschlüsselt. Wenn Sie versuchen, ein entschlüsseltes Objekt in einen Amazon-S3-Bucket zu kopieren, für den eine Verschlüsselung erforderlich ist, schlägt die Operation fehl. Weitere Informationen finden Sie unter [Verwenden der Datenverschlüsselung](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingEncryption.html).  Beispiel: `‑‑s3ServerSideEncryption`   | Nein  | 
| ‑‑deleteOnSuccess  |  Wenn der Kopiervorgang erfolgreich ist, veranlasst diese Option S3, die kopierten Dateien vom Quellspeicherort DistCp zu löschen. Dies ist nützlich, wenn Sie Ausgabedateien, wie Protokolldateien, von einem Speicherort an einen anderen als geplante Aufgabe kopieren und dieselben Dateien nicht zweimal kopiert werden sollen.  Beispiel: `‑‑deleteOnSuccess`   | Nein  | 
| ‑‑disableMultipartUpload  |  Deaktiviert die Verwendung des mehrteiligen Uploads.  Beispiel: `‑‑disableMultipartUpload`   | Nein  | 
| ‑‑multipartUploadChunkSize=SIZE  |  Die Größe jedes Teils in einem mehrteiligen Amazon-S3-Upload in MiB. S3 DistCp verwendet den mehrteiligen Upload, wenn Daten kopiert werden, die `multipartUploadChunkSize` größer sind als. Um die Arbeitsleistung zu verbessern, können Sie die Größe der einzelnen Teile erhöhen. Die Standardgröße beträgt 128 MiB.  Beispiel: `‑‑multipartUploadChunkSize=1000`   | Nein  | 
| ‑‑numberFiles  |  Stellt Ausgabedateien sequenzielle Nummern voran. Die Zählung beginnt bei 0, es sei denn, von `‑‑startingIndex` wird ein anderer Wert angegeben.  Beispiel: `‑‑numberFiles`   | Nein  | 
| ‑‑startingIndex=INDEX  |  Wird mit `‑‑numberFiles` verwendet, um die erste Nummer in der Sequenz anzugeben.  Beispiel: `‑‑startingIndex=1`   | Nein  | 
| ‑‑outputManifest=FILENAME  |  Erstellt eine mit Gzip komprimierte Textdatei, die eine Liste aller von S3 kopierten Dateien enthält. DistCp  Beispiel: `‑‑outputManifest=manifest-1.gz`   | Nein  | 
| ‑‑previousManifest=PATH  |  Liest eine Manifestdatei, die bei einem früheren Aufruf von S3 DistCp mit dem `‑‑outputManifest` Flag erstellt wurde. Wenn das `‑‑previousManifest` Flag gesetzt ist, DistCp schließt S3 die im Manifest aufgelisteten Dateien vom Kopiervorgang aus. Wenn `‑‑outputManifest` zusammen mit `‑‑previousManifest` angegeben ist, erscheinen die im vorherigen Manifest aufgeführten Dateien auch in der neuen Manifestdatei, obwohl die Dateien nicht kopiert werden.  Beispiel: `‑‑previousManifest=/usr/bin/manifest-1.gz`   | Nein  | 
| ‑‑requirePreviousManifest |  Erfordert ein vorheriges Manifest, das während eines vorherigen Aufrufs von S3 DistCp erstellt wurde. Wenn dieser Wert auf "false" festgelegt ist, wird kein Fehler generiert, wenn kein vorheriges Manifest angegeben ist. Der Standardwert ist true.  | Nein  | 
| ‑‑copyFromManifest  |  Kehrt das Verhalten `‑‑previousManifest` um, dass S3 DistCp die angegebene Manifestdatei als Liste der zu kopierenden Dateien verwendet, anstatt als Liste von Dateien, die vom Kopieren ausgeschlossen werden sollen.  Beispiel: `‑‑copyFromManifest ‑‑previousManifest=/usr/bin/manifest-1.gz`   | Nein  | 
| ‑‑s3Endpoint=ENDPOINT |  Gibt den Amazon-S3-Endpunkt an, der beim Hochladen einer Datei verwendet werden soll. Mit dieser Option wird der Endpunkt sowohl für die Quelle als auch das Ziel festgelegt. Falls die Option nicht festgelegt ist, wird als Standardendpunkt `s3.amazonaws.com` verwendet. Eine Liste der Amazon-S3-Endpunkte finden Sie unter [Regionen und Endpunkte](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region).  Beispiel: `‑‑s3Endpoint=s3.eu-west-1.amazonaws.com`   | Nein  | 
| ‑‑storageClass=CLASS |  Die zu verwendende Speicherklasse, wenn das Ziel Amazon S3 ist. Gültige Werte sind STANDARD und REDUCED\$1REDUNDANCY. Wenn diese Option nicht angegeben ist, DistCp versucht S3, die Speicherklasse beizubehalten. Beispiel: `‑‑storageClass=STANDARD`  | Nein  | 
| ‑‑srcPrefixesFile=PATH |  Eine Textdatei in Amazon S3 (s3://), HDFS (hdfs:///) oder dem lokalen Dateisystem (file:/) mit einer Liste von `src`-Präfixen, in der ein Präfix pro Zeile aufgeführt wird.  Wenn `srcPrefixesFile` angegeben, listet S3 DistCp den src-Pfad nicht auf. Stattdessen wird eine Quellliste als kombiniertes Ergebnis der Auflistung aller in dieser Datei angegebenen Präfixe erstellt. Der relative Pfad im Vergleich zum src-Pfad, wird statt dieser Präfixe verwendet, um die Zielpfade zu generieren. Wenn `srcPattern` angegeben ist, wird dieser Wert auf die kombinierten Listenergebnisse der Quellpräfixe angewendet, um die Eingabe weiter zu filtern. Wenn `copyFromManifest` verwendet wird, werden die Objekte im Manifest kopiert und `srcPrefixesFile` wird ignoriert. Beispiel: `‑‑srcPrefixesFile=PATH`  | Nein  | 

Zusätzlich zu den oben genannten Optionen DistCp implementiert S3 die [Tool-Schnittstelle](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html), was bedeutet, dass es die generischen Optionen unterstützt. 

## S3 DistCp als Schritt in einem Cluster hinzufügen
<a name="UsingEMR_s3distcp.step"></a>

Sie können S3 aufrufen, DistCp indem Sie es als Schritt zu Ihrem Cluster hinzufügen. Sie können Schritte beim Start eines Clusters oder einem aktuell ausgeführten Cluster mithilfe der Konsole, Befehlszeilenschnittstelle oder API hinzufügen. Die folgenden Beispiele zeigen das Hinzufügen eines DistCp S3-Schritts zu einem laufenden Cluster. Weitere Informationen zum Hinzufügen von Schritten zu einem Cluster finden Sie unter [Aufträge an einen Cluster übermitteln](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html) im *Verwaltungshandbuch für Amazon EMR*.

**Um einem laufenden Cluster einen DistCp S3-Schritt hinzuzufügen, verwenden Sie den AWS CLI**

Weitere Informationen zur Verwendung von Amazon EMR-Befehlen in der AWS CLI finden Sie in der [AWS CLI Befehlsreferenz.](https://docs.aws.amazon.com/cli/latest/reference/emr)
+ Um einem Cluster, der S3 aufruft, einen Schritt hinzuzufügenDistCp, übergeben Sie die Parameter, die angeben, wie S3 den Kopiervorgang ausführen DistCp soll, als Argumente. 

  Im folgenden Beispiel werden Daemon-Protokolle von Amazon S3 in `hdfs:///output` kopiert. Im folgenden Befehl:
  + `‑‑cluster-id` gibt den Cluster an.
  + `Jar`ist der Speicherort der DistCp S3-JAR-Datei. Ein Beispiel für die Ausführung eines Befehls auf einem Cluster mithilfe von command-runner.jar finden Sie unter [Senden eines benutzerdefinierten JAR-Schritts zur Ausführung eines Skripts oder Befehls](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html#emr-commandrunner-examples).
  + `Args`ist eine durch Kommas getrennte Liste der Optionsname-Wert-Paare, die an S3 übergeben werden sollen. DistCp Eine vollständige Liste der verfügbaren Optionen finden Sie unter [DistCp S3-Optionen](#UsingEMR_s3distcp.options). 

  Um einem laufenden Cluster einen DistCp S3-Kopierschritt hinzuzufügen, fügen Sie wie `myStep.json` in diesem Beispiel Folgendes in eine JSON-Datei ein, die in Amazon S3 oder Ihrem lokalen Dateisystem gespeichert ist. *j-3GYXXXXXX9IOK*Ersetzen Sie es durch Ihre Cluster-ID und *amzn-s3-demo-bucket* ersetzen Sie es durch Ihren Amazon S3 S3-Bucket-Namen.

  ```
  [
      {
          "Name":"S3DistCp step",
          "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*[a-zA-Z,]+"],
          "ActionOnFailure":"CONTINUE",
          "Type":"CUSTOM_JAR",
          "Jar":"command-runner.jar"        
      }
  ]
  ```

  ```
  aws emr add-steps ‑‑cluster-id j-3GYXXXXXX9IOK ‑‑steps file://./myStep.json
  ```

**Example Protokolldateien von Amazon S3 zu HDFS kopieren**  
Dieses Beispiel veranschaulicht, wie Protokolldateien in einem Amazon-S3-Bucket in HDFS kopiert werden, indem Sie einen Schritt zu einem aktuell ausgeführten Cluster hinzufügen. In diesem Beispiel werden die Daten, die in die Daemon-Protokolle kopiert werden, anhand der Option `‑‑srcPattern` eingeschränkt.   
Zum Kopieren von Protokolldateien von Amazon S3 in HDFS mithilfe der Option `‑‑srcPattern` geben Sie Folgendes in einer JSON-Datei, die in Amazon S3 oder Ihrem lokalen Dateisystem gespeichert ist, als `myStep.json` in diesem Beispiel ein. *j-3GYXXXXXX9IOK*Ersetzen Sie es durch Ihre Cluster-ID und *amzn-s3-demo-bucket* ersetzen Sie es durch Ihren Amazon S3 S3-Bucket-Namen.  

```
[
    {
        "Name":"S3DistCp step",
        "Args":["s3-dist-cp","‑‑s3Endpoint=s3.amazonaws.com","‑‑src=s3://amzn-s3-demo-bucket/logs/j-3GYXXXXXX9IOJ/node/","‑‑dest=hdfs:///output","‑‑srcPattern=.*daemons.*-hadoop-.*"],
        "ActionOnFailure":"CONTINUE",
        "Type":"CUSTOM_JAR",
        "Jar":"command-runner.jar"        
    }
]
```

## Aufräumen nach fehlgeschlagenen DistCp S3-Jobs
<a name="s3distcp-cleanup"></a>

Wenn S3 einige oder alle der angegebenen Dateien DistCp nicht kopieren kann, schlägt der Befehl oder der Clusterschritt fehl und gibt einen Fehlercode ungleich Null zurück. In diesem Fall bereinigt S3 teilweise kopierte Dateien DistCp nicht. Sie müssen sie manuell löschen.

Teilweise kopierte Dateien werden im `tmp` HDFS-Verzeichnis in Unterverzeichnissen mit der eindeutigen Kennung des DistCp S3-Jobs gespeichert. Die ID finden Sie in der Standardausgabe der Aufgabe.

Für einen DistCp S3-Job mit der ID können Sie `4b1c37bb-91af-4391-aaf8-46a6067085a6` beispielsweise eine Verbindung zum Master-Knoten des Clusters herstellen und den folgenden Befehl ausführen, um die mit dem Job verknüpften Ausgabedateien anzuzeigen.

```
hdfs dfs -ls /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output
```

Der Befehl gibt eine Liste von Dateien ähnlich der folgenden zurück:

```
Found 8 items
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/_SUCCESS
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00000
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00001
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:02 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00002
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00003
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00004
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00005
-rw-r‑‑r‑‑   1 hadoop hadoop          0 2018-12-10 06:03 /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6/output/part-r-00006
```

Sie können dann den folgenden Befehl ausführen, um das Verzeichnis und den gesamten Inhalt zu löschen.

```
hdfs dfs rm -rf /tmp/4b1c37bb-91af-4391-aaf8-46a6067085a6
```