

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

Das Szenario für dieses Tutorial beinhaltet die Aufnahme von Aktiengeschäften in einen Datenstream und das Schreiben einer grundlegenden Amazon Kinesis Data Streams Streams-Anwendung, die Berechnungen im Stream durchführt. Sie lernen, wie Sie einen Stream von Datensätzen an Kinesis Data Streams senden und eine Anwendung implementieren, die die Datensätze nahezu in Echtzeit verarbeitet und verarbeitet.

**Wichtig**  
Nachdem Sie einen Stream erstellt haben, fallen für Ihr Konto geringe Gebühren für die Nutzung von Kinesis Data Streams an, da Kinesis Data Streams nicht für das AWS kostenlose Kontingent in Frage kommt. Nach dem Start der Konsumentenanwendung fallen nominale Gebühren für die Amazon-DynamoDB-Nutzung an. Die Konsumentenanwendung verwendet DynamoDB zum Verfolgen des Verarbeitungsstatus. Wenn Sie mit dieser Anwendung fertig sind, sollten Sie Ihre AWS -Ressourcen löschen, damit keine weiteren Gebühren anfallen. Weitere Informationen finden Sie unter [Bereinigen von Ressourcen](tutorial-stock-data-kplkcl2-finish.md).

Der Code greift nicht auf tatsächliche Wertpapierdaten zu, sondern simuliert nur deren Strom. Dazu werden zufällige Wertpapierdaten erzeugt. Ausgangspunkt sind dabei echte Marktdaten der 25 führenden Aktien gemäß Börsenkapitalisierung von Februar 2015. Wenn Sie Zugriff auf einen Echtzeit-Stream von Wertpapierdaten haben, möchten Sie vermutlich nützliche, zeitnahe Statistiken aus den Stream-Daten erzeugen. Sie können beispielsweise eine Zeitfensteranalyse durchführen, um festzustellen, welche Aktie in den letzten 5 Minuten am häufigsten erworben wurde. Oder Sie möchten im Falle eines zu großen Verkaufsauftrags (d. h. zu viele Anteile) benachrichtigt werden. Der Code in diesem Tutorial kann erweitert werden, um solche Funktionen bereitzustellen.

Sie können die in diesem Tutorial aufgeführten Schritte mit einem Desktop-PC oder Laptop durchführen und sowohl den Produzenten- als auch den Verbrauchercode auf demselben Rechner oder auf jeder Plattform ausführen, die den definierten Anforderungen entspricht.

Bei den gezeigten Beispielen wird die Region USA West (Oregon) verwendet. Sie funktionieren aber auch für alle anderen [AWS -Regionen, die Kinesis Data Streams unterstützen](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [

# Erfüllen der Voraussetzungen
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# Erstellen Sie einen Datenstream
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# Erstellen Sie eine IAM-Richtlinie und einen Benutzer
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# Laden Sie den Code herunter und erstellen Sie ihn
](tutorial-stock-data-kplkcl2-download.md)
+ [

# Implementieren Sie den Produzenten
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# Implementieren Sie den Verbraucher
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (Optional) Erweitern Sie den Consumer
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# Bereinigen von Ressourcen
](tutorial-stock-data-kplkcl2-finish.md)

# Erfüllen der Voraussetzungen
<a name="tutorial-stock-data-kplkcl2-begin"></a>

Sie müssen die folgenden Voraussetzungen erfüllen, um dieses Tutorial abschließen zu können:

## Erstellen und verwenden Sie ein Amazon Web Services Services-Konto
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Bevor Sie beginnen, stellen Sie sicher, dass Sie mit den unter besprochenen Konzepten vertraut sind[Terminologie und Konzepte von Amazon Kinesis Data Streams](key-concepts.md), insbesondere mit Streams, Shards, Produzenten und Verbrauchern. Es ist zudem sinnvoll, die Schritte in der folgenden Anleitung durchzuführen: [Tutorial: Installation und Konfiguration von AWS CLI for Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Sie benötigen ein AWS Konto und einen Webbrowser, um auf die AWS-Managementkonsole zugreifen zu können.

Verwenden Sie für den Konsolenzugriff Ihren IAM-Benutzernamen und Ihr Passwort, um sich über die IAM-Anmeldeseite bei [AWS-Managementkonsole](https://console.aws.amazon.com/console/home) anzumelden. Informationen zu AWS Sicherheitsanmeldedaten, einschließlich programmatischem Zugriff und Alternativen zu langfristigen Anmeldeinformationen, finden Sie unter [AWS Sicherheitsanmeldedaten](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) im *IAM-Benutzerhandbuch*. Einzelheiten zur Anmeldung bei Ihrem AWS-Konto finden Sie unter [So melden Sie sich an AWS im AWS-Anmeldung](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) *Benutzerhandbuch*.

Weitere Informationen zu IAM und dem Einrichten von Sicherheitsschlüsseln finden Sie unter [Erstellen eines IAM-Benutzers](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Erfüllen Sie die Anforderungen an die Systemsoftware
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

Im System, das Sie zum Ausführen der Anwendung verwenden, muss Java 7 oder höher installiert sein. Zum Herunterladen und Installieren des neuesten Java Development Kit (JDK) rufen Sie die [Website für die Java SE-Installation von Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html) auf.

Sie benötigen die neueste [AWS SDK für Java](https://aws.amazon.com/sdk-for-java/)-Version. 

[Die Consumer-Anwendung benötigt die Kinesis Client Library (KCL) Version 2.2.9 oder höher, die Sie unter /tree/master abrufen GitHub können. https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Erstellen Sie einen Datenstream](tutorial-stock-data-kplkcl2-create-stream.md)

# Erstellen Sie einen Datenstream
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

Zuerst müssen Sie den Daten-Stream erstellen, den Sie in den weiteren Schritten dieses Tutorials verwenden.

**So erstellen Sie einen Stream**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Klicken Sie im Navigationsbereich auf **Data Streams (Daten-Streams)**.

1. Erweitern Sie in der Navigationsleiste die Regionsauswahl und wählen Sie eine Region aus.

1. Wählen Sie **Create Kinesis Stream (Kinesis-Stream erstellen)**.

1. Geben Sie einen Namen für den Daten-Stream ein (z. B. **StockTradeStream**).

1. Geben Sie **1** die Anzahl der Shards ein, lassen Sie aber Estimate the number of shards you'll need (**Schätzung der Anzahl der Shards**, die Sie benötigen) reduziert.

1. Wählen Sie **Create Kinesis Stream (Kinesis-Stream erstellen)**.

Auf der Listenseite **Kinesis Streams** (Kinesis-Streams) lautet der Status des Streams `CREATING`, während der Stream erstellt wird. Sobald der Stream verwendet werden kann, ändert sich der Status in `ACTIVE`. 

Wenn Sie den Namen des Streams auswählen, wird auf der angezeigten Seite auf der Registerkarte **Details** eine Zusammenfassung der Konfiguration des Daten-Streams angezeigt. Der Abschnitt **Monitoring (Überwachung)** zeigt Überwachungsinformationen für den Stream an.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl2-iam.md)

# Erstellen Sie eine IAM-Richtlinie und einen Benutzer
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Bewährte Sicherheitsmethoden für AWS schreiben die Verwendung detaillierter Berechtigungen vor, um den Zugriff auf verschiedene Ressourcen zu kontrollieren. AWS Identity and Access Management (IAM) ermöglicht Ihnen die Verwaltung von Benutzern und Benutzerberechtigungen in. AWS Eine [IAM-Richtlinie](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) führt explizit zulässige Aktionen sowie Ressourcen auf, für die diese Aktionen relevant sind.

Im Folgenden werden die Berechtigungen für Produzenten und Konsumenten von Kinesis Data Streams aufgelistet, die mindestens erforderlich sind.


**Produzent**  

| Aktionen | Ressource | Zweck | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis Data Stream | Vor dem Versuch, Datensätze zu lesen, prüft der Verbraucher, ob der Daten-Stream vorhanden und ob er aktiv ist und ob Shards im Daten-Stream enthalten sind. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis Data Stream | Abonniert und registriert Verbraucher bei einem Shard. | 
| PutRecord, PutRecords | Kinesis Data Stream | Schreibt Datensätze in Kinesis Data Streams. | 


**Konsument**  

| **Aktionen** | **Ressource** | **Zweck** | 
| --- | --- | --- | 
| DescribeStream | Kinesis Data Stream | Vor dem Versuch, Datensätze zu lesen, prüft der Verbraucher, ob der Daten-Stream vorhanden und ob er aktiv ist und ob Shards im Daten-Stream enthalten sind. | 
| GetRecords, GetShardIterator  | Kinesis Data Stream | Auslesen von Datensätzen aus einem Shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon-DynamoDB-Tabelle. | Wenn der Verbraucher mit der Kinesis Client Library (KCL) (Version 1.x oder 2.x) entwickelt wird, benötigt er Berechtigungen für die DynamoDB-Tabelle, um den Verarbeitungsstatus der Anwendung zu verfolgen. | 
| DeleteItem | Amazon-DynamoDB-Tabelle. | Für den Fall, dass der Verbraucher split/merge Operationen an Kinesis Data Streams Streams-Shards ausführt. | 
| PutMetricData |  CloudWatch Amazon-Protokoll | Die KCL lädt auch Metriken hoch CloudWatch, die für die Überwachung der Anwendung nützlich sind. | 

Für dieses Tutorial erstellen Sie eine einzelne IAM-Richtlinie, die alle oben genannten Berechtigungen gewährt. Im Praxiseinsatz können Sie zwei Richtlinien erstellen, eine für Produzenten und eine für Verbraucher.

**So erstellen Sie eine IAM-Richtlinie**

1. Suchen Sie den Amazon-Ressourcennamen (ARN) für den neuen Datenstream, den Sie im vorherigen Schritt erstellt haben. Sie finden diesen ARN als **Stream ARN (Stream-ARN)** oben auf der Registerkarte **Details**. Das ARN-Format sieht folgendermaßen aus:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*Region*  
Der AWS Regionalcode, zum Beispiel`us-west-2`. Weitere Informationen finden Sie unter [Regionen und Verfügbarkeitskonzepte](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*Konto*  
Die AWS Konto-ID, wie in den [Kontoeinstellungen](https://console.aws.amazon.com/billing/home?#/account) angezeigt.  
*Name*  
Der Name des Datenstroms, den Sie im vorherigen Schritt erstellt haben, nämlich`StockTradeStream`.

1. Bestimmen Sie den ARN für die DynamoDB-Tabelle, die vom Verbraucher verwendet und von der ersten Verbraucher-Instance erstellt wird. Er muss das folgende Format aufweisen:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   Die Region und die Konto-ID sind identisch mit den Werten im ARN des Datenstroms, den Sie für dieses Tutorial verwenden, aber der *Name ist der Name* der DynamoDB-Tabelle, die von der Verbraucheranwendung erstellt und verwendet wird. KCL verwendet den Anwendungsnamen als Tabellennamen. Verwenden Sie `StockTradesProcessor` in diesem Schritt für den DynamoDB-Tabellennamen, da dies der Anwendungsname ist, der in späteren Schritten in diesem Tutorial verwendet wird.

1. **Wählen Sie in der IAM-Konsole unter **Policies** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)) die Option Create policy aus.** Wenn Sie zum ersten Mal mit IAM-Richtlinien arbeiten, wählen Sie **Erste Schritte**, **Richtlinie erstellen** aus.

1. Wählen Sie **Select (Auswählen)** neben **Policy Generator (Richtliniengenerator)** aus.

1. Wählen Sie **Amazon Kinesis** als AWS Service.

1. Legen Sie `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` und `PutRecords` als zulässige Aktionen fest.

1. Geben Sie den ARN des Daten-Streams ein, den Sie in diesem Tutorial verwenden.

1. Verwenden Sie **Add Statement (Statement hinzufügen)** für die folgenden Elemente:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   Das Sternchen (`*`), das bei der Angabe einer ARN verwendet wird, ist nicht erforderlich. In diesem Fall liegt das daran, dass es keine bestimmte Ressource gibt, für die die `PutMetricData` Aktion aufgerufen wird. CloudWatch 

1. Wählen Sie **Next Step (Weiter)** aus.

1. Ändern Sie **Policy Name (Richtlinienname)** in `StockTradeStreamPolicy`, prüfen Sie den Code und wählen sie **Create Policy (Richtlinie erstellen)** aus.

Das resultierende Richtliniendokument sollte folgendermaßen aussehen:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**So erstellen Sie einen IAM-Benutzer**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie auf der Seite **Users (Benutzer)** die Option **Add user (Benutzer hinzufügen)** aus.

1. Geben Sie `StockTradeStreamUser` als **Benutzername** ein.

1. Wählen Sie für **Access type (Zugriffstyp)** die Option **Programmatic access** (Programmgesteuerter Zugriff) und wählen Sie dann **Next: Permissions (Weiter: Berechtigungen)**.

1. Wählen Sie **Vorhandene Richtlinien direkt zuordnen**.

1. Suchen Sie nach dem Namen der Richtlinie, die Sie im vorherigen Verfahren erstellt haben (`StockTradeStreamPolicy`. Markieren Sie das Kontrollkästchen links neben dem Richtliniennamen und wählen Sie dann **Next: Review (Weiter: Prüfen)**.

1. Überprüfen Sie die Details und die Zusammenfassung und wählen Sie dann **Create user (Benutzer erstellen)** aus.

1. Kopieren Sie die **Access key ID (Zugriffsschlüssel-ID)** und speichern Sie sie privat. Wählen Sie unter **Secret access key (Geheimer Zugriffsschlüssel)** die Option **Show (Anzeigen)** und speichern Sie auch diesen Schlüssel privat.

1. Fügen Sie die Zugriffs- und Geheimschlüssel in eine lokalen Datei an einem sichern Ort ein, auf den nur Sie Zugriff haben. Erstellen Sie für diese Anwendung eine Datei namens ` ~/.aws/credentials` (mit strikten Berechtigungen). Die Datei sollte das folgende Format aufweisen:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**So fügen Sie einem Benutzer eine IAM-Richtlinie hinzu**

1. Klicken Sie in der IAM;-Konsole auf [Richtlinien](https://console.aws.amazon.com/iam/home?#policies) und wählen Sie **Richtlinienaktionen** aus. 

1. Klicken Sie auf `StockTradeStreamPolicy` und **Attach (Verknüpfen)**.

1. Wählen Sie `StockTradeStreamUser` und **Attach Policy (Richtlinie anfügen)** aus.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Laden Sie den Code herunter und erstellen Sie ihn](tutorial-stock-data-kplkcl2-download.md)

# Laden Sie den Code herunter und erstellen Sie ihn
<a name="tutorial-stock-data-kplkcl2-download"></a>

In diesem Thema finden Sie Beispielcode zur Implementierung der Beispiel-Wertpapiertransaktionen im Daten-Stream (*Produzent*) und zur Verarbeitung dieser Daten (*Verbraucher*).

**So laden Sie den Code herunter und erstellen ihn**

1. Laden Sie den Quellcode aus dem [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub Repo auf Ihren Computer herunter.

1. Erstellen Sie in der IDE ein Projekt mit dem Quellcode. Behalten Sie die bereitgestellte Verzeichnisstruktur bei.

1. Fügen Sie dem Projekt die folgenden Bibliotheken hinzu:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google-Kernbibliotheken für Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Je nach IDE wird das Projekt möglicherweise automatisch erstellt. Wenn nicht, erstellen Sie es mit den für Ihre IDE erforderlichen Schritten.

Wenn Sie diese Schritte erfolgreich abgeschlossen haben, können Sie zum nächsten Abschnitt wechseln, [Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md). 

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)

# Implementieren Sie den Produzenten
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Dieses Tutorial verwendet das reale Szenario einer Überwachung des Wertpapierhandels. Die folgenden Prinzipien erläutern kurz, wie dieses Szenario zum Produzenten und seiner unterstützenden Codestruktur passt.

Beachten Sie den [Quellcode](https://github.com/aws-samples/amazon-kinesis-learning ) und prüfen Sie die folgenden Informationen.

**StockTrade Klasse**  
Ein bestimmter Wertpapierhandel wird durch eine Instance der StockTrade-Klasse dargestellt. Diese Instance enthält folgende Attribute: Tickersymbol, Preis, Anzahl der Anteile, Art des Handels (Kauf oder Verkauf) und ID zur eindeutigen Identifizierung der Handelsaktion. Dieser Klasse wird für Sie implementiert. 

**Stream-Datensatz**  
Ein Stream ist eine Sequenz von Datensätzen. Ein Datensatz ist die Serialisierung einer `StockTrade`-Instance im JSON-Format. Beispiel:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator Klasse**  
StockTradeGenerator hat eine Methode namens`getRandomTrade()`, die bei jedem Aufruf einen neuen zufällig generierten Aktienhandel zurückgibt. Dieser Klasse wird für Sie implementiert.

**StockTradesWriter Klasse**  
Die `main`-Methode des Produzenten, StockTradesWriter, ruft kontinuierlich eine zufällige Handelsaktion ab und sendet die Daten an Kinesis Data Streams, indem sie die folgenden Aufgaben durchführt:  

1. Liest den Datenstromnamen und den Regionsnamen als Eingabe.

1. Verwendet die`KinesisAsyncClientBuilder`, um die Region, die Anmeldeinformationen und die Client-Konfiguration festzulegen. 

1. Sicherstellen, dass der Stream vorhanden und aktiv ist (wenn nicht, kommt es zu einer Beendigung mit Fehler). 

1. Aufrufen der `StockTradeGenerator.getRandomTrade()`-Methode in einer Dauerschleife und anschließend Aufruf der `sendStockTrade`-Methode, um die Handelsdaten alle 100 Millisekunden an den Stream zu senden. 
Die `sendStockTrade`-Methode der `StockTradesWriter`-Klasse hat den folgenden Code:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Beachten Sie die folgende Code-Struktur:
+ Die `PutRecord` API erwartet ein Byte-Array, und Sie müssen den Handel in das JSON-Format konvertieren. Diese einzelne Codezeile führt die Operation aus: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Bevor Sie die Transaktion senden können, erstellen Sie eine neue `PutRecordRequest`-Instance (in diesem Fall Anforderung genannt). Jede `request` benötigt den Namen des Streams, einen Partitionsschlüssel und einen Daten-Blob. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  Das Beispiel verwendet einen Börsenticker als Partitionsschlüssel, der den Datensatz einem bestimmten Shard zuordnet. In der Praxis sollten Sie Hunderte oder gar Tausende von Partitionsschlüsseln pro Shard haben, sodass die Datensätze in Ihrem Stream gleichmäßig verteilt sind. Weitere Informationen zum Hinzufügen von Daten zu einem Stream finden Sie unter [Daten in Amazon Kinesis Data Streams schreiben](building-producers.md).

  `request` kann die Daten jetzt an den Client senden (PUT-Operation): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Eine Fehlerüberprüfung und Protokollierung sind immer nützliche Ergänzungen. Dieser Code protokolliert Fehlerbedingungen: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Fügen Sie den try/catch Block rund um die `put` Operation hinzu: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Der Grund besteht darin, dass eine Kinesis Data Streams-PUT-Operation aufgrund eines Netzwerkfehlers fehlschlagen kann oder gedrosselt wird, weil die Durchsatzgrenze des Streams erreicht wird. Es wird empfohlen, dass Sie Ihre Wiederholungsrichtlinie für `put` Operationen sorgfältig prüfen, um Datenverlust zu vermeiden, z. B. die Verwendung eines Wiederholungsversuchs. 
+ Eine Statusprotokollierung ist hilfreich, wenn auch optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Der hier gezeigte Produzent verwendet die API-Funktionalität von Kinesis Data Streams für einzelne Datensätze `PutRecord`. In der Praxis ist es oft effizienter, die Eignung von `PutRecords` für mehrere Datensätze zu nutzen und mehrere Datensatzstapel gleichzeitig zu senden, wenn ein Produzent viele Datensätze erstellt. Weitere Informationen finden Sie unter [Daten in Amazon Kinesis Data Streams schreiben](building-producers.md).

**So führen Sie den Produzenten aus**

1. Verifizieren Sie, dass der in [Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl2-iam.md) abgerufene Zugriffsschlüssel samt geheimem Schlüsselpaar in der Datei `~/.aws/credentials` gespeichert wurde. 

1. Führen Sie die `StockTradeWriter`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradeStream us-west-2
   ```

   Wenn Sie den Stream in einer anderen Region als `us-west-2` erstellt haben, müssen Sie stattdessen hier diese Region angeben.

Die Ausgabe sollte folgendermaßen oder ähnlich aussehen:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Ihre Wertpapierdaten werden nun von Kinesis Data Streams eingespeist.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementieren Sie den Verbraucher](tutorial-stock-data-kplkcl2-consumer.md)

# Implementieren Sie den Verbraucher
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung setzt auf Kinesis Client Library (KCL) auf, die viele der mühsamen Arbeiten einer Verbraucheranwendung übernimmt. Weitere Informationen finden Sie unter [Informationen zu KCL 1.x und 2.x](shared-throughput-kcl-consumers.md). 

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

**StockTradesProcessor Klasse**  
Die für Sie bereitgestellte Hauptklasse des Verbrauchers, die die folgenden Aufgaben erfüllt:  
+ Liest die als Argumente übergebenen Anwendungs-, Datenstrom- und Regionsnamen.
+ Erzeugt eine `KinesisAsyncClient` Instanz mit dem Namen der Region.
+ Erstellt eine `StockTradeRecordProcessorFactory`-Instance für die Instances von `ShardRecordProcessor`, implementiert von einer `StockTradeRecordProcessor`-Instance. 
+ Erzeugt eine `ConfigsBuilder` Instanz mit der `StockTradeRecordProcessorFactory` Instanz `KinesisAsyncClient` `StreamName``ApplicationName`,, und. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich.
+ KCL-Scheduler (zuvor in den KCL-Versionen 1.x als KCL-Worker bezeichnet) mit der `ConfigsBuilder`-Instance erstellen. 
+ Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die `StockTradeRecordProcessor`-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten. 

**StockTradeRecordProcessor Klasse**  
Implementierung der `StockTradeRecordProcessor`-Instance, die wiederum fünf erforderliche Methoden implementiert: `initialize`, `processRecords`, `leaseLost`, `shardEnded` und `shutdownRequested`.   
Die Methoden `initialize` und `shutdownRequested` werden von KCL verwendet, um dem Datensatzverarbeiter mitzuteilen, wann er bereit sein muss, Datensätze zu empfangen, und wann der Empfang von Datensätzen gestoppt werden muss, damit alle anwendungsspezifischen Einrichtungs- und Beendigungsaufgaben ausgeführt werden können. `leaseLost` und `shardEnded` werden verwendet, um Logik zu implementieren, die beim Verlust eines Lease oder bei Erreichen des Shards-Endes im Rahmen der Shard-Verarbeitung auszuführen ist. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.   
Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der `processRecords` Methode, die wiederum `processRecord` für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.   
Beachten Sie außerdem die Implementierung der Hilfsmethoden für `processRecord`: `reportStats` und `resetStats`, die im ursprünglichen Quellcode leer sind.   
Die `processRecords`-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:  
+ Für jeden übergebenen Datensatz wird `processRecord` aufgerufen. 
+ Ruft `reportStats()` zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dann `resetStats()`, um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält.
+ Legt den Zeitpunkt für die nächste Berichterstellung fest.
+ Ruft `checkpoint()` auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist. 
+ Legt den Zeitpunkt für das nächste Checkpointing fest.
Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter [Verwendung der Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats Klasse**  
Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:  
+ `addStockTrade(StockTrade)`: fügt die angegebene `StockTrade` in die ausgeführten Statistiken ein.
+ `toString()`: gibt die Statistiken als formatierte Zeichenfolge zurück.
Diese Klasse verfolgt die beliebtesten Aktien, indem sie fortlaufend die Gesamtzahl der Trades für jede Aktie und deren maximale Anzahl zählt. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.

Fügen Sie Code zu den Methoden der `StockTradeRecordProcessor`-Klasse hinzu, wie in den folgenden Schritten gezeigt. 

**So implementieren Sie den Konsumenten**

1. Implementieren Sie die `processRecord`-Methode, indem Sie ein richtig bemessenes `StockTrade`-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implementieren Sie eine `reportStats` Methode. Ändern Sie das Ausgabeformat nach Ihren Wünschen. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implementieren Sie die Methode `resetStats`, die eine neue `stockStats`-Instance erstellt. 

   ```
   stockStats = new StockStats();
   ```

1. Implementieren Sie die folgenden Methoden, die für die `ShardRecordProcessor` Schnittstelle erforderlich sind:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**So führen Sie den Konsumenten aus**

1. Führen Sie den unter [[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md) erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.

1. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei `~/.aws/credentials` gespeichert sind. 

1. Führen Sie die `StockTradesProcessor`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Beachten Sie, dass Sie, wenn Sie Ihren Stream in einer anderen Region als `us-west-2` erstellt haben, stattdiesen diese Region hier angeben müssen.

Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Optional) Erweitern Sie den Consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Optional) Erweitern Sie den Consumer
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

Dieser optionale Abschnitt zeigt, wie Sie den Konsumentencode erweitern können, um einem komplexeren Szenario gerecht zu werden.

Wenn Sie minütlich über die größten Verkaufsaufträge informiert werden möchten, können Sie die `StockStats`-Klasse an drei Stellen bearbeiten.

**So erweitern Sie den Konsumenten**

1. Fügen Sie neue Instance-Variablen hinzu:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Fügen Sie folgenden Code zu hinz `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Ändern Sie die `toString`-Methode, um die zusätzlichen Informationen zu drucken:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Wenn Sie den Konsumenten jetzt ausführen (führen Sie auch den Produzenten), sollten Sie eine Ausgabe ähnlich der folgenden sehen:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Bereinigen von Ressourcen](tutorial-stock-data-kplkcl2-finish.md)

# Bereinigen von Ressourcen
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Da Sie für die Nutzung des Kinesis Data Streams zahlen, sollten Sie diesen ebenso wie die entsprechende Amazon-DynamoDB-Tabelle löschen, wenn sie nicht mehr benötigt werden. Für einen aktiven Stream fallen auch dann nominale Gebühren an, wenn Sie keine Datensätze senden oder abrufen. Der Grund hierfür ist, dass ein aktiver Stream durch kontinuierlichen „Horchen“ darauf, ob neue Datensätze oder Anforderungen zum Abrufen von Datensätzen eingehen, Ressourcen verbraucht.

**So löschen Sie den Stream und die Tabelle**

1. Schalten Sie alle Produzenten und Verbraucher aus, die Sie möglicherweise noch in Betrieb haben.

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Wählen Sie den Stream aus, den Sie für diese Anwendung erstellt haben (`StockTradeStream`).

1. Wählen Sie **Delete Stream (Stream löschen)** aus.

1. Öffnen Sie die DynamoDB-Konsole unter. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Löschen Sie die `StockTradesProcessor`-Tabelle.

## Zusammenfassung
<a name="tutorial-stock-data-kplkcl2-summary"></a>

Die Verarbeitung einer großen Datenmenge nahezu in Echtzeit erfordert weder das Schreiben von kompliziertem Code noch die Entwicklung einer riesigen Infrastruktur. Es ist genauso einfach wie die Schreiblogik, eine kleine Datenmenge zu verarbeiten (wie Schreiben`processRecord(Record)`), aber Kinesis Data Streams zu verwenden, um so zu skalieren, dass es für eine große Menge an Streaming-Daten funktioniert. Sie müssen sich keine Gedanken über die Skalierung der Verarbeitung machen, da Kinesis Data Streams das für Sie übernimmt. Sie müssen lediglich Ihre Streaming-Datensätze an Kinesis Data Streams senden und eine Logik für die Verarbeitung neu empfangener Datensätze schreiben. 

Nachfolgend einige mögliche Erweiterungen für diese Anwendung.

**Shard-übergreifende Aggregation**  
Derzeit erhalten Sie Statistiken, die aus der Aggregation von Datensätzen resultieren, die von einem einzelnen Auftragnehmer eines einzelnen Shards empfangen werden. (Ein Shard kann jeweils nur von einem Auftragnehmer in einer einzelnen Anwendung verarbeitet werden.) Möglicherweise möchten Sie, wenn Sie eine Skalierung durchführen und mehr als einen Shard haben, eine Shard-übergreifende Aggregation vornehmen. Dies kann mit einer Pipeline-Architektur realisiert werden, bei der die Ausgabe eines jeden Auftragnehmers in einen anderen Stream mit einem einzelnen Shard eingespeist wird, der von einem Auftragnehmer verarbeitet wird, der die Ausgaben der ersten Phase aggregiert. Da die Daten aus der ersten Phase begrenzt sind (eine Ausgabe pro Minute pro Shard), können sie problemlos von nur einem Shard verarbeitet werden.

**Skalierung**  
Wenn der Stream skaliert wird, damit mehr Shards zur Verfügung stehen (da viele Produzenten Daten senden), geschieht dies dadurch, dass mehr Worker hinzugefügt werden. Sie können die Auftragnehmer in EC2-Instances ausführen und Auto-Scaling-Gruppen nutzen.

**Verwenden Sie Konnektoren zu Amazon S3 S3/ DynamoDB/Amazon Redshift/Storm**  
Da ein Stream kontinuierlich verarbeitet wird, kann seine Ausgabe an andere Ziele gesendet werden. AWS bietet [Konnektoren](https://github.com/awslabs/amazon-kinesis-connectors) zur Integration von Kinesis Data Streams mit anderen AWS Diensten und Tools von Drittanbietern.