

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Tutorials „Erste Schritte“ für Amazon Kinesis Data Streams
<a name="examples"></a>

Amazon Kinesis Data Streams bietet eine Reihe verschiedener Lösungen für die Aufnahme und Nutzung von Daten aus Kinesis-Datenströmen. Die Tutorials in diesem Abschnitt sollen Ihnen helfen, die Konzepte und Funktionen von Amazon Kinesis Data Streams besser zu verstehen und die Lösung zu finden, die Ihren Anforderungen entspricht. 

**Topics**
+ [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 2.x](tutorial-stock-data-kplkcl2.md)
+ [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md)
+ [Tutorial: Analysieren Sie Aktiendaten in Echtzeit mit Amazon Managed Service für Apache Flink](tutorial-stock-data.md)
+ [Tutorial: Verwendung AWS Lambda mit Amazon Kinesis Data Streams](tutorial-stock-data-lambda.md)
+ [Verwenden Sie die AWS Streaming-Datenlösung für Amazon Kinesis](examples-streaming-solution.md)

# Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 2.x
<a name="tutorial-stock-data-kplkcl2"></a>

Das Szenario für dieses Tutorial beinhaltet die Aufnahme von Aktiengeschäften in einen Datenstream und das Schreiben einer grundlegenden Amazon Kinesis Data Streams Streams-Anwendung, die Berechnungen im Stream durchführt. Sie lernen, wie Sie einen Stream von Datensätzen an Kinesis Data Streams senden und eine Anwendung implementieren, die die Datensätze nahezu in Echtzeit verarbeitet und verarbeitet.

**Wichtig**  
Nachdem Sie einen Stream erstellt haben, fallen für Ihr Konto geringe Gebühren für die Nutzung von Kinesis Data Streams an, da Kinesis Data Streams nicht für das AWS kostenlose Kontingent in Frage kommt. Nach dem Start der Konsumentenanwendung fallen nominale Gebühren für die Amazon-DynamoDB-Nutzung an. Die Konsumentenanwendung verwendet DynamoDB zum Verfolgen des Verarbeitungsstatus. Wenn Sie mit dieser Anwendung fertig sind, sollten Sie Ihre AWS -Ressourcen löschen, damit keine weiteren Gebühren anfallen. Weitere Informationen finden Sie unter [Bereinigen von Ressourcen](tutorial-stock-data-kplkcl2-finish.md).

Der Code greift nicht auf tatsächliche Wertpapierdaten zu, sondern simuliert nur deren Strom. Dazu werden zufällige Wertpapierdaten erzeugt. Ausgangspunkt sind dabei echte Marktdaten der 25 führenden Aktien gemäß Börsenkapitalisierung von Februar 2015. Wenn Sie Zugriff auf einen Echtzeit-Stream von Wertpapierdaten haben, möchten Sie vermutlich nützliche, zeitnahe Statistiken aus den Stream-Daten erzeugen. Sie können beispielsweise eine Zeitfensteranalyse durchführen, um festzustellen, welche Aktie in den letzten 5 Minuten am häufigsten erworben wurde. Oder Sie möchten im Falle eines zu großen Verkaufsauftrags (d. h. zu viele Anteile) benachrichtigt werden. Der Code in diesem Tutorial kann erweitert werden, um solche Funktionen bereitzustellen.

Sie können die in diesem Tutorial aufgeführten Schritte mit einem Desktop-PC oder Laptop durchführen und sowohl den Produzenten- als auch den Verbrauchercode auf demselben Rechner oder auf jeder Plattform ausführen, die den definierten Anforderungen entspricht.

Bei den gezeigten Beispielen wird die Region USA West (Oregon) verwendet. Sie funktionieren aber auch für alle anderen [AWS -Regionen, die Kinesis Data Streams unterstützen](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [Erfüllen der Voraussetzungen](tutorial-stock-data-kplkcl2-begin.md)
+ [Erstellen Sie einen Datenstream](tutorial-stock-data-kplkcl2-create-stream.md)
+ [Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl2-iam.md)
+ [Laden Sie den Code herunter und erstellen Sie ihn](tutorial-stock-data-kplkcl2-download.md)
+ [Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)
+ [Implementieren Sie den Verbraucher](tutorial-stock-data-kplkcl2-consumer.md)
+ [(Optional) Erweitern Sie den Consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [Bereinigen von Ressourcen](tutorial-stock-data-kplkcl2-finish.md)

# Erfüllen der Voraussetzungen
<a name="tutorial-stock-data-kplkcl2-begin"></a>

Sie müssen die folgenden Voraussetzungen erfüllen, um dieses Tutorial abschließen zu können:

## Erstellen und verwenden Sie ein Amazon Web Services Services-Konto
<a name="tutorial-stock-data-kplkcl2-begin-aws"></a>

Bevor Sie beginnen, stellen Sie sicher, dass Sie mit den unter besprochenen Konzepten vertraut sind[Terminologie und Konzepte von Amazon Kinesis Data Streams](key-concepts.md), insbesondere mit Streams, Shards, Produzenten und Verbrauchern. Es ist zudem sinnvoll, die Schritte in der folgenden Anleitung durchzuführen: [Tutorial: Installation und Konfiguration von AWS CLI for Kinesis Data Streams](kinesis-tutorial-cli-installation.md).

Sie benötigen ein AWS Konto und einen Webbrowser, um auf die AWS-Managementkonsole zugreifen zu können.

Verwenden Sie für den Konsolenzugriff Ihren IAM-Benutzernamen und Ihr Passwort, um sich über die IAM-Anmeldeseite bei [AWS-Managementkonsole](https://console.aws.amazon.com/console/home) anzumelden. Informationen zu AWS Sicherheitsanmeldedaten, einschließlich programmatischem Zugriff und Alternativen zu langfristigen Anmeldeinformationen, finden Sie unter [AWS Sicherheitsanmeldedaten](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) im *IAM-Benutzerhandbuch*. Einzelheiten zur Anmeldung bei Ihrem AWS-Konto finden Sie unter [So melden Sie sich an AWS im AWS-Anmeldung](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) *Benutzerhandbuch*.

Weitere Informationen zu IAM und dem Einrichten von Sicherheitsschlüsseln finden Sie unter [Erstellen eines IAM-Benutzers](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Erfüllen Sie die Anforderungen an die Systemsoftware
<a name="tutorial-stock-data-kplkcl2-begin-sys"></a>

Im System, das Sie zum Ausführen der Anwendung verwenden, muss Java 7 oder höher installiert sein. Zum Herunterladen und Installieren des neuesten Java Development Kit (JDK) rufen Sie die [Website für die Java SE-Installation von Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html) auf.

Sie benötigen die neueste [AWS SDK für Java](https://aws.amazon.com/sdk-for-java/)-Version. 

[Die Consumer-Anwendung benötigt die Kinesis Client Library (KCL) Version 2.2.9 oder höher, die Sie unter /tree/master abrufen GitHub können. https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/tree/master)

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-begin-next"></a>

[Erstellen Sie einen Datenstream](tutorial-stock-data-kplkcl2-create-stream.md)

# Erstellen Sie einen Datenstream
<a name="tutorial-stock-data-kplkcl2-create-stream"></a>

Zuerst müssen Sie den Daten-Stream erstellen, den Sie in den weiteren Schritten dieses Tutorials verwenden.

**So erstellen Sie einen Stream**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Klicken Sie im Navigationsbereich auf **Data Streams (Daten-Streams)**.

1. Erweitern Sie in der Navigationsleiste die Regionsauswahl und wählen Sie eine Region aus.

1. Wählen Sie **Create Kinesis Stream (Kinesis-Stream erstellen)**.

1. Geben Sie einen Namen für den Daten-Stream ein (z. B. **StockTradeStream**).

1. Geben Sie **1** die Anzahl der Shards ein, lassen Sie aber Estimate the number of shards you'll need (**Schätzung der Anzahl der Shards**, die Sie benötigen) reduziert.

1. Wählen Sie **Create Kinesis Stream (Kinesis-Stream erstellen)**.

Auf der Listenseite **Kinesis Streams** (Kinesis-Streams) lautet der Status des Streams `CREATING`, während der Stream erstellt wird. Sobald der Stream verwendet werden kann, ändert sich der Status in `ACTIVE`. 

Wenn Sie den Namen des Streams auswählen, wird auf der angezeigten Seite auf der Registerkarte **Details** eine Zusammenfassung der Konfiguration des Daten-Streams angezeigt. Der Abschnitt **Monitoring (Überwachung)** zeigt Überwachungsinformationen für den Stream an.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-create-stream-next"></a>

[Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl2-iam.md)

# Erstellen Sie eine IAM-Richtlinie und einen Benutzer
<a name="tutorial-stock-data-kplkcl2-iam"></a>

Bewährte Sicherheitsmethoden für AWS schreiben die Verwendung detaillierter Berechtigungen vor, um den Zugriff auf verschiedene Ressourcen zu kontrollieren. AWS Identity and Access Management (IAM) ermöglicht Ihnen die Verwaltung von Benutzern und Benutzerberechtigungen in. AWS Eine [IAM-Richtlinie](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) führt explizit zulässige Aktionen sowie Ressourcen auf, für die diese Aktionen relevant sind.

Im Folgenden werden die Berechtigungen für Produzenten und Konsumenten von Kinesis Data Streams aufgelistet, die mindestens erforderlich sind.


**Produzent**  

| Aktionen | Ressource | Zweck | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis Data Stream | Vor dem Versuch, Datensätze zu lesen, prüft der Verbraucher, ob der Daten-Stream vorhanden und ob er aktiv ist und ob Shards im Daten-Stream enthalten sind. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis Data Stream | Abonniert und registriert Verbraucher bei einem Shard. | 
| PutRecord, PutRecords | Kinesis Data Stream | Schreibt Datensätze in Kinesis Data Streams. | 


**Konsument**  

| **Aktionen** | **Ressource** | **Zweck** | 
| --- | --- | --- | 
| DescribeStream | Kinesis Data Stream | Vor dem Versuch, Datensätze zu lesen, prüft der Verbraucher, ob der Daten-Stream vorhanden und ob er aktiv ist und ob Shards im Daten-Stream enthalten sind. | 
| GetRecords, GetShardIterator  | Kinesis Data Stream | Auslesen von Datensätzen aus einem Shard. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon-DynamoDB-Tabelle. | Wenn der Verbraucher mit der Kinesis Client Library (KCL) (Version 1.x oder 2.x) entwickelt wird, benötigt er Berechtigungen für die DynamoDB-Tabelle, um den Verarbeitungsstatus der Anwendung zu verfolgen. | 
| DeleteItem | Amazon-DynamoDB-Tabelle. | Für den Fall, dass der Verbraucher split/merge Operationen an Kinesis Data Streams Streams-Shards ausführt. | 
| PutMetricData |  CloudWatch Amazon-Protokoll | Die KCL lädt auch Metriken hoch CloudWatch, die für die Überwachung der Anwendung nützlich sind. | 

Für dieses Tutorial erstellen Sie eine einzelne IAM-Richtlinie, die alle oben genannten Berechtigungen gewährt. Im Praxiseinsatz können Sie zwei Richtlinien erstellen, eine für Produzenten und eine für Verbraucher.

**So erstellen Sie eine IAM-Richtlinie**

1. Suchen Sie den Amazon-Ressourcennamen (ARN) für den neuen Datenstream, den Sie im vorherigen Schritt erstellt haben. Sie finden diesen ARN als **Stream ARN (Stream-ARN)** oben auf der Registerkarte **Details**. Das ARN-Format sieht folgendermaßen aus:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*Region*  
Der AWS Regionalcode, zum Beispiel`us-west-2`. Weitere Informationen finden Sie unter [Regionen und Verfügbarkeitskonzepte](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*Konto*  
Die AWS Konto-ID, wie in den [Kontoeinstellungen](https://console.aws.amazon.com/billing/home?#/account) angezeigt.  
*Name*  
Der Name des Datenstroms, den Sie im vorherigen Schritt erstellt haben, nämlich`StockTradeStream`.

1. Bestimmen Sie den ARN für die DynamoDB-Tabelle, die vom Verbraucher verwendet und von der ersten Verbraucher-Instance erstellt wird. Er muss das folgende Format aufweisen:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   Die Region und die Konto-ID sind identisch mit den Werten im ARN des Datenstroms, den Sie für dieses Tutorial verwenden, aber der *Name ist der Name* der DynamoDB-Tabelle, die von der Verbraucheranwendung erstellt und verwendet wird. KCL verwendet den Anwendungsnamen als Tabellennamen. Verwenden Sie `StockTradesProcessor` in diesem Schritt für den DynamoDB-Tabellennamen, da dies der Anwendungsname ist, der in späteren Schritten in diesem Tutorial verwendet wird.

1. **Wählen Sie in der IAM-Konsole unter **Policies** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)) die Option Create policy aus.** Wenn Sie zum ersten Mal mit IAM-Richtlinien arbeiten, wählen Sie **Erste Schritte**, **Richtlinie erstellen** aus.

1. Wählen Sie **Select (Auswählen)** neben **Policy Generator (Richtliniengenerator)** aus.

1. Wählen Sie **Amazon Kinesis** als AWS Service.

1. Legen Sie `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` und `PutRecords` als zulässige Aktionen fest.

1. Geben Sie den ARN des Daten-Streams ein, den Sie in diesem Tutorial verwenden.

1. Verwenden Sie **Add Statement (Statement hinzufügen)** für die folgenden Elemente:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   Das Sternchen (`*`), das bei der Angabe einer ARN verwendet wird, ist nicht erforderlich. In diesem Fall liegt das daran, dass es keine bestimmte Ressource gibt, für die die `PutMetricData` Aktion aufgerufen wird. CloudWatch 

1. Wählen Sie **Next Step (Weiter)** aus.

1. Ändern Sie **Policy Name (Richtlinienname)** in `StockTradeStreamPolicy`, prüfen Sie den Code und wählen sie **Create Policy (Richtlinie erstellen)** aus.

Das resultierende Richtliniendokument sollte folgendermaßen aussehen:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**So erstellen Sie einen IAM-Benutzer**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie auf der Seite **Users (Benutzer)** die Option **Add user (Benutzer hinzufügen)** aus.

1. Geben Sie `StockTradeStreamUser` als **Benutzername** ein.

1. Wählen Sie für **Access type (Zugriffstyp)** die Option **Programmatic access** (Programmgesteuerter Zugriff) und wählen Sie dann **Next: Permissions (Weiter: Berechtigungen)**.

1. Wählen Sie **Vorhandene Richtlinien direkt zuordnen**.

1. Suchen Sie nach dem Namen der Richtlinie, die Sie im vorherigen Verfahren erstellt haben (`StockTradeStreamPolicy`. Markieren Sie das Kontrollkästchen links neben dem Richtliniennamen und wählen Sie dann **Next: Review (Weiter: Prüfen)**.

1. Überprüfen Sie die Details und die Zusammenfassung und wählen Sie dann **Create user (Benutzer erstellen)** aus.

1. Kopieren Sie die **Access key ID (Zugriffsschlüssel-ID)** und speichern Sie sie privat. Wählen Sie unter **Secret access key (Geheimer Zugriffsschlüssel)** die Option **Show (Anzeigen)** und speichern Sie auch diesen Schlüssel privat.

1. Fügen Sie die Zugriffs- und Geheimschlüssel in eine lokalen Datei an einem sichern Ort ein, auf den nur Sie Zugriff haben. Erstellen Sie für diese Anwendung eine Datei namens ` ~/.aws/credentials` (mit strikten Berechtigungen). Die Datei sollte das folgende Format aufweisen:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**So fügen Sie einem Benutzer eine IAM-Richtlinie hinzu**

1. Klicken Sie in der IAM;-Konsole auf [Richtlinien](https://console.aws.amazon.com/iam/home?#policies) und wählen Sie **Richtlinienaktionen** aus. 

1. Klicken Sie auf `StockTradeStreamPolicy` und **Attach (Verknüpfen)**.

1. Wählen Sie `StockTradeStreamUser` und **Attach Policy (Richtlinie anfügen)** aus.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-iam-next"></a>

[Laden Sie den Code herunter und erstellen Sie ihn](tutorial-stock-data-kplkcl2-download.md)

# Laden Sie den Code herunter und erstellen Sie ihn
<a name="tutorial-stock-data-kplkcl2-download"></a>

In diesem Thema finden Sie Beispielcode zur Implementierung der Beispiel-Wertpapiertransaktionen im Daten-Stream (*Produzent*) und zur Verarbeitung dieser Daten (*Verbraucher*).

**So laden Sie den Code herunter und erstellen ihn**

1. Laden Sie den Quellcode aus dem [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning) GitHub Repo auf Ihren Computer herunter.

1. Erstellen Sie in der IDE ein Projekt mit dem Quellcode. Behalten Sie die bereitgestellte Verzeichnisstruktur bei.

1. Fügen Sie dem Projekt die folgenden Bibliotheken hinzu:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google-Kernbibliotheken für Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Je nach IDE wird das Projekt möglicherweise automatisch erstellt. Wenn nicht, erstellen Sie es mit den für Ihre IDE erforderlichen Schritten.

Wenn Sie diese Schritte erfolgreich abgeschlossen haben, können Sie zum nächsten Abschnitt wechseln, [Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md). 

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-download-next"></a>

[[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)

# Implementieren Sie den Produzenten
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Dieses Tutorial verwendet das reale Szenario einer Überwachung des Wertpapierhandels. Die folgenden Prinzipien erläutern kurz, wie dieses Szenario zum Produzenten und seiner unterstützenden Codestruktur passt.

Beachten Sie den [Quellcode](https://github.com/aws-samples/amazon-kinesis-learning ) und prüfen Sie die folgenden Informationen.

**StockTrade Klasse**  
Ein bestimmter Wertpapierhandel wird durch eine Instance der StockTrade-Klasse dargestellt. Diese Instance enthält folgende Attribute: Tickersymbol, Preis, Anzahl der Anteile, Art des Handels (Kauf oder Verkauf) und ID zur eindeutigen Identifizierung der Handelsaktion. Dieser Klasse wird für Sie implementiert. 

**Stream-Datensatz**  
Ein Stream ist eine Sequenz von Datensätzen. Ein Datensatz ist die Serialisierung einer `StockTrade`-Instance im JSON-Format. Beispiel:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator Klasse**  
StockTradeGenerator hat eine Methode namens`getRandomTrade()`, die bei jedem Aufruf einen neuen zufällig generierten Aktienhandel zurückgibt. Dieser Klasse wird für Sie implementiert.

**StockTradesWriter Klasse**  
Die `main`-Methode des Produzenten, StockTradesWriter, ruft kontinuierlich eine zufällige Handelsaktion ab und sendet die Daten an Kinesis Data Streams, indem sie die folgenden Aufgaben durchführt:  

1. Liest den Datenstromnamen und den Regionsnamen als Eingabe.

1. Verwendet die`KinesisAsyncClientBuilder`, um die Region, die Anmeldeinformationen und die Client-Konfiguration festzulegen. 

1. Sicherstellen, dass der Stream vorhanden und aktiv ist (wenn nicht, kommt es zu einer Beendigung mit Fehler). 

1. Aufrufen der `StockTradeGenerator.getRandomTrade()`-Methode in einer Dauerschleife und anschließend Aufruf der `sendStockTrade`-Methode, um die Handelsdaten alle 100 Millisekunden an den Stream zu senden. 
Die `sendStockTrade`-Methode der `StockTradesWriter`-Klasse hat den folgenden Code:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Beachten Sie die folgende Code-Struktur:
+ Die `PutRecord` API erwartet ein Byte-Array, und Sie müssen den Handel in das JSON-Format konvertieren. Diese einzelne Codezeile führt die Operation aus: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Bevor Sie die Transaktion senden können, erstellen Sie eine neue `PutRecordRequest`-Instance (in diesem Fall Anforderung genannt). Jede `request` benötigt den Namen des Streams, einen Partitionsschlüssel und einen Daten-Blob. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  Das Beispiel verwendet einen Börsenticker als Partitionsschlüssel, der den Datensatz einem bestimmten Shard zuordnet. In der Praxis sollten Sie Hunderte oder gar Tausende von Partitionsschlüsseln pro Shard haben, sodass die Datensätze in Ihrem Stream gleichmäßig verteilt sind. Weitere Informationen zum Hinzufügen von Daten zu einem Stream finden Sie unter [Daten in Amazon Kinesis Data Streams schreiben](building-producers.md).

  `request` kann die Daten jetzt an den Client senden (PUT-Operation): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Eine Fehlerüberprüfung und Protokollierung sind immer nützliche Ergänzungen. Dieser Code protokolliert Fehlerbedingungen: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Fügen Sie den try/catch Block rund um die `put` Operation hinzu: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Der Grund besteht darin, dass eine Kinesis Data Streams-PUT-Operation aufgrund eines Netzwerkfehlers fehlschlagen kann oder gedrosselt wird, weil die Durchsatzgrenze des Streams erreicht wird. Es wird empfohlen, dass Sie Ihre Wiederholungsrichtlinie für `put` Operationen sorgfältig prüfen, um Datenverlust zu vermeiden, z. B. die Verwendung eines Wiederholungsversuchs. 
+ Eine Statusprotokollierung ist hilfreich, wenn auch optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Der hier gezeigte Produzent verwendet die API-Funktionalität von Kinesis Data Streams für einzelne Datensätze `PutRecord`. In der Praxis ist es oft effizienter, die Eignung von `PutRecords` für mehrere Datensätze zu nutzen und mehrere Datensatzstapel gleichzeitig zu senden, wenn ein Produzent viele Datensätze erstellt. Weitere Informationen finden Sie unter [Daten in Amazon Kinesis Data Streams schreiben](building-producers.md).

**So führen Sie den Produzenten aus**

1. Verifizieren Sie, dass der in [Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl2-iam.md) abgerufene Zugriffsschlüssel samt geheimem Schlüsselpaar in der Datei `~/.aws/credentials` gespeichert wurde. 

1. Führen Sie die `StockTradeWriter`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradeStream us-west-2
   ```

   Wenn Sie den Stream in einer anderen Region als `us-west-2` erstellt haben, müssen Sie stattdessen hier diese Region angeben.

Die Ausgabe sollte folgendermaßen oder ähnlich aussehen:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Ihre Wertpapierdaten werden nun von Kinesis Data Streams eingespeist.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementieren Sie den Verbraucher](tutorial-stock-data-kplkcl2-consumer.md)

# Implementieren Sie den Verbraucher
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung setzt auf Kinesis Client Library (KCL) auf, die viele der mühsamen Arbeiten einer Verbraucheranwendung übernimmt. Weitere Informationen finden Sie unter [Informationen zu KCL 1.x und 2.x](shared-throughput-kcl-consumers.md). 

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

**StockTradesProcessor Klasse**  
Die für Sie bereitgestellte Hauptklasse des Verbrauchers, die die folgenden Aufgaben erfüllt:  
+ Liest die als Argumente übergebenen Anwendungs-, Datenstrom- und Regionsnamen.
+ Erzeugt eine `KinesisAsyncClient` Instanz mit dem Namen der Region.
+ Erstellt eine `StockTradeRecordProcessorFactory`-Instance für die Instances von `ShardRecordProcessor`, implementiert von einer `StockTradeRecordProcessor`-Instance. 
+ Erzeugt eine `ConfigsBuilder` Instanz mit der `StockTradeRecordProcessorFactory` Instanz `KinesisAsyncClient` `StreamName``ApplicationName`,, und. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich.
+ KCL-Scheduler (zuvor in den KCL-Versionen 1.x als KCL-Worker bezeichnet) mit der `ConfigsBuilder`-Instance erstellen. 
+ Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die `StockTradeRecordProcessor`-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten. 

**StockTradeRecordProcessor Klasse**  
Implementierung der `StockTradeRecordProcessor`-Instance, die wiederum fünf erforderliche Methoden implementiert: `initialize`, `processRecords`, `leaseLost`, `shardEnded` und `shutdownRequested`.   
Die Methoden `initialize` und `shutdownRequested` werden von KCL verwendet, um dem Datensatzverarbeiter mitzuteilen, wann er bereit sein muss, Datensätze zu empfangen, und wann der Empfang von Datensätzen gestoppt werden muss, damit alle anwendungsspezifischen Einrichtungs- und Beendigungsaufgaben ausgeführt werden können. `leaseLost` und `shardEnded` werden verwendet, um Logik zu implementieren, die beim Verlust eines Lease oder bei Erreichen des Shards-Endes im Rahmen der Shard-Verarbeitung auszuführen ist. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.   
Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der `processRecords` Methode, die wiederum `processRecord` für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.   
Beachten Sie außerdem die Implementierung der Hilfsmethoden für `processRecord`: `reportStats` und `resetStats`, die im ursprünglichen Quellcode leer sind.   
Die `processRecords`-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:  
+ Für jeden übergebenen Datensatz wird `processRecord` aufgerufen. 
+ Ruft `reportStats()` zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dann `resetStats()`, um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält.
+ Legt den Zeitpunkt für die nächste Berichterstellung fest.
+ Ruft `checkpoint()` auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist. 
+ Legt den Zeitpunkt für das nächste Checkpointing fest.
Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter [Verwendung der Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats Klasse**  
Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:  
+ `addStockTrade(StockTrade)`: fügt die angegebene `StockTrade` in die ausgeführten Statistiken ein.
+ `toString()`: gibt die Statistiken als formatierte Zeichenfolge zurück.
Diese Klasse verfolgt die beliebtesten Aktien, indem sie fortlaufend die Gesamtzahl der Trades für jede Aktie und deren maximale Anzahl zählt. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.

Fügen Sie Code zu den Methoden der `StockTradeRecordProcessor`-Klasse hinzu, wie in den folgenden Schritten gezeigt. 

**So implementieren Sie den Konsumenten**

1. Implementieren Sie die `processRecord`-Methode, indem Sie ein richtig bemessenes `StockTrade`-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implementieren Sie eine `reportStats` Methode. Ändern Sie das Ausgabeformat nach Ihren Wünschen. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implementieren Sie die Methode `resetStats`, die eine neue `stockStats`-Instance erstellt. 

   ```
   stockStats = new StockStats();
   ```

1. Implementieren Sie die folgenden Methoden, die für die `ShardRecordProcessor` Schnittstelle erforderlich sind:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**So führen Sie den Konsumenten aus**

1. Führen Sie den unter [[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md) erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.

1. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei `~/.aws/credentials` gespeichert sind. 

1. Führen Sie die `StockTradesProcessor`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Beachten Sie, dass Sie, wenn Sie Ihren Stream in einer anderen Region als `us-west-2` erstellt haben, stattdiesen diese Region hier angeben müssen.

Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Optional) Erweitern Sie den Consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (Optional) Erweitern Sie den Consumer
<a name="tutorial-stock-data-kplkcl2-consumer-extension"></a>

Dieser optionale Abschnitt zeigt, wie Sie den Konsumentencode erweitern können, um einem komplexeren Szenario gerecht zu werden.

Wenn Sie minütlich über die größten Verkaufsaufträge informiert werden möchten, können Sie die `StockStats`-Klasse an drei Stellen bearbeiten.

**So erweitern Sie den Konsumenten**

1. Fügen Sie neue Instance-Variablen hinzu:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Fügen Sie folgenden Code zu hinz `addStockTrade`:

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Ändern Sie die `toString`-Methode, um die zusätzlichen Informationen zu drucken:

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

Wenn Sie den Konsumenten jetzt ausführen (führen Sie auch den Produzenten), sollten Sie eine Ausgabe ähnlich der folgenden sehen:

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-consumer-extension-next"></a>

[Bereinigen von Ressourcen](tutorial-stock-data-kplkcl2-finish.md)

# Bereinigen von Ressourcen
<a name="tutorial-stock-data-kplkcl2-finish"></a>

Da Sie für die Nutzung des Kinesis Data Streams zahlen, sollten Sie diesen ebenso wie die entsprechende Amazon-DynamoDB-Tabelle löschen, wenn sie nicht mehr benötigt werden. Für einen aktiven Stream fallen auch dann nominale Gebühren an, wenn Sie keine Datensätze senden oder abrufen. Der Grund hierfür ist, dass ein aktiver Stream durch kontinuierlichen „Horchen“ darauf, ob neue Datensätze oder Anforderungen zum Abrufen von Datensätzen eingehen, Ressourcen verbraucht.

**So löschen Sie den Stream und die Tabelle**

1. Schalten Sie alle Produzenten und Verbraucher aus, die Sie möglicherweise noch in Betrieb haben.

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Wählen Sie den Stream aus, den Sie für diese Anwendung erstellt haben (`StockTradeStream`).

1. Wählen Sie **Delete Stream (Stream löschen)** aus.

1. Öffnen Sie die DynamoDB-Konsole unter. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Löschen Sie die `StockTradesProcessor`-Tabelle.

## Zusammenfassung
<a name="tutorial-stock-data-kplkcl2-summary"></a>

Die Verarbeitung einer großen Datenmenge nahezu in Echtzeit erfordert weder das Schreiben von kompliziertem Code noch die Entwicklung einer riesigen Infrastruktur. Es ist genauso einfach wie die Schreiblogik, eine kleine Datenmenge zu verarbeiten (wie Schreiben`processRecord(Record)`), aber Kinesis Data Streams zu verwenden, um so zu skalieren, dass es für eine große Menge an Streaming-Daten funktioniert. Sie müssen sich keine Gedanken über die Skalierung der Verarbeitung machen, da Kinesis Data Streams das für Sie übernimmt. Sie müssen lediglich Ihre Streaming-Datensätze an Kinesis Data Streams senden und eine Logik für die Verarbeitung neu empfangener Datensätze schreiben. 

Nachfolgend einige mögliche Erweiterungen für diese Anwendung.

**Shard-übergreifende Aggregation**  
Derzeit erhalten Sie Statistiken, die aus der Aggregation von Datensätzen resultieren, die von einem einzelnen Auftragnehmer eines einzelnen Shards empfangen werden. (Ein Shard kann jeweils nur von einem Auftragnehmer in einer einzelnen Anwendung verarbeitet werden.) Möglicherweise möchten Sie, wenn Sie eine Skalierung durchführen und mehr als einen Shard haben, eine Shard-übergreifende Aggregation vornehmen. Dies kann mit einer Pipeline-Architektur realisiert werden, bei der die Ausgabe eines jeden Auftragnehmers in einen anderen Stream mit einem einzelnen Shard eingespeist wird, der von einem Auftragnehmer verarbeitet wird, der die Ausgaben der ersten Phase aggregiert. Da die Daten aus der ersten Phase begrenzt sind (eine Ausgabe pro Minute pro Shard), können sie problemlos von nur einem Shard verarbeitet werden.

**Skalierung**  
Wenn der Stream skaliert wird, damit mehr Shards zur Verfügung stehen (da viele Produzenten Daten senden), geschieht dies dadurch, dass mehr Worker hinzugefügt werden. Sie können die Auftragnehmer in EC2-Instances ausführen und Auto-Scaling-Gruppen nutzen.

**Verwenden Sie Konnektoren zu Amazon S3 S3/ DynamoDB/Amazon Redshift/Storm**  
Da ein Stream kontinuierlich verarbeitet wird, kann seine Ausgabe an andere Ziele gesendet werden. AWS bietet [Konnektoren](https://github.com/awslabs/amazon-kinesis-connectors) zur Integration von Kinesis Data Streams mit anderen AWS Diensten und Tools von Drittanbietern.

# Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x
<a name="tutorial-stock-data-kplkcl"></a>

Im Szenario dieses Tutorials werden Wertpapierdaten in einen Datenstrom geschrieben. Zudem wird eine einfache Anwendung mit Amazon Kinesis Data Streams erstellt, die Berechnungen mit dem Stream durchführt. Sie lernen, wie Sie einen Stream von Datensätzen an Kinesis Data Streams senden und eine Anwendung implementieren, die die Datensätze nahezu in Echtzeit verarbeitet und verarbeitet.

**Wichtig**  
Nachdem Sie einen Stream erstellt haben, fallen für Ihr Konto geringe Gebühren für die Nutzung von Kinesis Data Streams an, da Kinesis Data Streams nicht für das AWS kostenlose Kontingent in Frage kommt. Nach dem Start der Konsumentenanwendung fallen nominale Gebühren für die Amazon-DynamoDB-Nutzung an. Die Konsumentenanwendung verwendet DynamoDB zum Verfolgen des Verarbeitungsstatus. Wenn Sie mit dieser Anwendung fertig sind, sollten Sie Ihre AWS -Ressourcen löschen, damit keine weiteren Gebühren anfallen. Weitere Informationen finden Sie unter [Bereinigen von Ressourcen](tutorial-stock-data-kplkcl-finish.md).

Der Code greift nicht auf tatsächliche Wertpapierdaten zu, sondern simuliert nur deren Strom. Dazu werden zufällige Wertpapierdaten erzeugt. Ausgangspunkt sind dabei echte Marktdaten der 25 führenden Aktien gemäß Börsenkapitalisierung von Februar 2015. Wenn Sie Zugriff auf einen Echtzeit-Stream von Wertpapierdaten haben, möchten Sie vermutlich nützliche, zeitnahe Statistiken aus den Stream-Daten erzeugen. Sie können beispielsweise eine Zeitfensteranalyse durchführen, um festzustellen, welche Aktie in den letzten 5 Minuten am häufigsten erworben wurde. Oder Sie möchten im Falle eines zu großen Verkaufsauftrags (d. h. zu viele Anteile) benachrichtigt werden. Der Code in diesem Tutorial kann erweitert werden, um solche Funktionen bereitzustellen.

Sie können die in diesem Tutorial aufgeführten Schritte auf einem Desktop-PC oder Laptop durchführen und sowohl den Produzenten- als auch den Konsumentencode auf demselben Rechner oder einer Plattform ausführen, die die angegebenen Anforderungen erfüllt, beispielsweise Amazon Elastic Compute Cloud (Amazon EC2).

Bei den gezeigten Beispielen wird die Region USA West (Oregon) verwendet. Sie funktionieren aber auch für alle anderen [AWS -Regionen, die Kinesis Data Streams unterstützen](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

**Topics**
+ [Erfüllen der Voraussetzungen](tutorial-stock-data-kplkcl-begin.md)
+ [Erstellen Sie einen Datenstream](tutorial-stock-data-kplkcl-create-stream.md)
+ [Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl-iam.md)
+ [Laden Sie den Implementierungscode herunter und erstellen Sie ihn](tutorial-stock-data-kplkcl-download.md)
+ [Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md)
+ [Implementieren Sie den Verbraucher](tutorial-stock-data-kplkcl-consumer.md)
+ [(Optional) Erweitern Sie die Zahl der Verbraucher](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [Bereinigen von Ressourcen](tutorial-stock-data-kplkcl-finish.md)

# Erfüllen der Voraussetzungen
<a name="tutorial-stock-data-kplkcl-begin"></a>

Für [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x[Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) gelten die folgenden Anforderungen.

## Erstellen und verwenden Sie ein Amazon Web Services Services-Konto
<a name="tutorial-stock-data-kplkcl-begin-aws"></a>

Bevor Sie beginnen, müssen Sie sich mit den in [Terminologie und Konzepte von Amazon Kinesis Data Streams](key-concepts.md) behandelten Konzepten vertraut machen, insbesondere mit Streams, Shards, Produzenten und Konsumenten. Das Durcharbeiten von [Tutorial: Installation und Konfiguration von AWS CLI for Kinesis Data Streams](kinesis-tutorial-cli-installation.md) ist ebenfalls hilfreich.

Sie benötigen ein AWS Konto und einen Webbrowser, um auf das zugreifen zu können AWS-Managementkonsole.

Verwenden Sie für den Konsolenzugriff Ihren IAM-Benutzernamen und Ihr Passwort, um sich über die IAM-Anmeldeseite bei [AWS-Managementkonsole](https://console.aws.amazon.com/console/home) anzumelden. Informationen zu AWS Sicherheitsanmeldedaten, einschließlich programmatischem Zugriff und Alternativen zu langfristigen Anmeldeinformationen, finden Sie unter [AWS Sicherheitsanmeldedaten](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) im *IAM-Benutzerhandbuch*. Einzelheiten zur Anmeldung bei Ihrem AWS-Konto finden Sie unter [So melden Sie sich an AWS im AWS-Anmeldung](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) *Benutzerhandbuch*.

Weitere Informationen zu IAM und dem Einrichten von Sicherheitsschlüsseln finden Sie unter [Erstellen eines IAM-Benutzers](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user).

## Erfüllen Sie die Anforderungen an die Systemsoftware
<a name="tutorial-stock-data-kplkcl-begin-sys"></a>

Auf dem System, dass die Anwendung ausführt, muss Java 7 oder höher installiert sein. Zum Herunterladen und Installieren des neuesten Java Development Kit (JDK) rufen Sie die [Website für die Java SE-Installation von Oracle](http://www.oracle.com/technetwork/java/javase/downloads/index.html) auf.

Wenn Sie über eine Java IDE wie [Eclipse](https://www.eclipse.org/downloads/) verfügen, können Sie den Quellcode öffnen, bearbeiten, erstellen und ausführen.

Sie benötigen die neueste [AWS SDK für Java](https://aws.amazon.com/sdk-for-java/)-Version. Wenn Sie Eclipse als IDE (integrierte Entwicklungsumgebung) nutzen, können Sie stattdessen das [AWS -Toolkit for Eclipse](https://aws.amazon.com/eclipse/) installieren. 

Für die Verbraucheranwendung ist die Kinesis Client Library (KCL) Version 1.2.1 oder höher erforderlich, die Sie GitHub unter [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) (Java) beziehen können.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-begin-next"></a>

[Erstellen Sie einen Datenstream](tutorial-stock-data-kplkcl-create-stream.md)

# Erstellen Sie einen Datenstream
<a name="tutorial-stock-data-kplkcl-create-stream"></a>

Im ersten Schritt von [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x[Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) erstellen Sie den Stream, der in den weiteren Schritten genutzt wird.

**So erstellen Sie einen Stream**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Klicken Sie im Navigationsbereich auf **Data Streams (Daten-Streams)**.

1. Erweitern Sie in der Navigationsleiste die Regionsauswahl und wählen Sie eine Region aus.

1. Wählen Sie **Create Kinesis Stream (Kinesis-Stream erstellen)**.

1. Geben Sie einen Namen für Ihren Stream ein (z. B. **StockTradeStream**).

1. Geben Sie **1** die Anzahl der Shards ein, lassen Sie aber Estimate the number of shards you'll need (**Schätzung der Anzahl der Shards**, die Sie benötigen) reduziert.

1. Wählen Sie **Create Kinesis Stream (Kinesis-Stream erstellen)**.

Auf der Seite mit der Auflistung der **Kinesis Streams** ist der Status Ihres Streams während des Erstellens auf `CREATING` gesetzt. Sobald der Stream verwendet werden kann, ändert sich der Status in `ACTIVE`. Wählen Sie den Namen des Streams aus. Auf der angezeigten Seite zeigt die Registerkarte **Details** eine Zusammenfassung Ihrer Streams-Konfiguration an. Der Abschnitt **Monitoring (Überwachung)** zeigt Überwachungsinformationen für den Stream an.

## Zusätzliche Informationen zu Shards
<a name="tutorial-stock-data-kplkcl-create-stream-info"></a>

Wenn Sie Kinesis Data Streams außerhalb dieses Tutorials verwenden, sollten Sie das Erstellen des Streams sorgfältiger planen. Bei der Bereitstellung von Shards sollten Sie vom höchsten erwarteten Bedarf ausgehen. In unserem Beispielszenario kommt es an der US-Börse tagsüber zu Datenverkehrsspitzen (Eastern Time). Der Bedarf sollte so geschätzt werden, dass diesen Verkehrsspitzen Rechnung getragen wird. Sie können dann entweder für den höchst möglichen Bedarf vorsorgen oder Ihren Stream an die Schwankungen anpassen. 

Ein *Shard* ist eine Einheit für die Durchsatzkapazität. Erweitern Sie auf der Seite **Kinesis Stream erstellen** **Schätzen Sie die Anzahl der Shards, die Sie brauchen**. Geben Sie entsprechend der folgenden Informationen die durchschnittliche Datensatzgröße, die maximale Anzahl der Datensätze, die pro Sekunde geschrieben werden, und die Anzahl der Konsumentenanwendungen an:

**Durchschnittliche Datensatzgröße**  
Eine Schätzung der berechneten durchschnittlichen Größe Ihrer Datensätze. Wenn Sie diesen Wert nicht kennen, verwenden Sie die geschätzte maximale Datensatzgröße.

**Max. geschriebene Datensätze**  
Berücksichtigen Sie die Anzahl der Entitäten, die Daten bereitstellen, und die ungefähre Anzahl der jeweils erstellten Datensätze pro Sekunde. Beispiel: Wenn Sie Börsendaten von 20 Handelsservern erhalten und jeder pro Sekunde 250 Handelsabschlüsse generiert, beträgt die Anzahl der Handelsabschlüsse (Datensätze) 5.000/Sekunde. 

**Anzahl der Konsumentenanwendungen**  
Die Anzahl der Anwendungen, die unabhängig voneinander Daten aus dem Stream auslesen, die Stream-Daten unterschiedlich verarbeiten und unterschiedliche Ausgaben generieren. Es können mehrere Instances einer Anwendung auf verschiedenen Rechnern (d. h. in einem Cluster) ausgeführt werden, sodass ein großer Daten-Stream verarbeitet werden kann.

Wenn der gezeigte Schätzwert der Shards das aktuelle Shard-Limit übersteigt, müssen Sie ggf. eine Erhöhung des Limits beantragen, bevor Sie einen Stream mit dieser Anzahl an Shards erstellen können. Nutzen Sie zum Beantragen einer Erhöhung des Shard-Limits das [Formular für Limits der Kinesis Data Streams](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis). Weitere Informationen zu Streams und Shards finden Sie unter [Kinesis-Datenstreams erstellen und verwalten](working-with-streams.md).

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-create-stream-next"></a>

[Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl-iam.md)

# Erstellen Sie eine IAM-Richtlinie und einen Benutzer
<a name="tutorial-stock-data-kplkcl-iam"></a>

Bewährte Sicherheitsmethoden für AWS schreiben die Verwendung detaillierter Berechtigungen vor, um den Zugriff auf verschiedene Ressourcen zu kontrollieren. AWS Identity and Access Management (IAM) ermöglicht Ihnen die Verwaltung von Benutzern und Benutzerberechtigungen in. AWS Eine [IAM-Richtlinie](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) führt explizit zulässige Aktionen sowie Ressourcen auf, für die diese Aktionen relevant sind.

Im Folgenden werden die Berechtigungen für einen Produzenten und Konsumenten von Kinesis Data Streams aufgelistet, die mindestens erforderlich sind.


**Produzent**  

| Aktionen | Ressource | Zweck | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis Data Streams | Bevor er versucht, Datensätze zu schreiben, prüft der Produzent, ob der Stream vorhanden und aktiv ist, ob die Shards im Stream enthalten sind und ob der Stream einen Consumer hat. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis Data Streams | Abonnieren und Registrieren eines Konsumenten bei einem Kinesis-Daten-Stream-Shard. | 
| PutRecord, PutRecords | Kinesis Data Streams | Schreiben von Datensätzen in Kinesis Data Streams. | 


**Konsument**  

| **Aktionen** | **Ressource** | **Zweck** | 
| --- | --- | --- | 
| DescribeStream | Kinesis Data Streams | Vor dem Versuch, Daten zu lesen, prüft der Konsument, ob der Stream vorhanden und aktiv ist und ob Shards im Stream enthalten sind. | 
| GetRecords, GetShardIterator  | Kinesis Data Streams | Lesen von Datensätzen aus einem Shard von Kinesis Data Streams. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon-DynamoDB-Tabelle. | Wenn der Konsument mit der Kinesis Client Library (KCL) entwickelt wird, benötigt er Berechtigungen für die DynamoDB-Tabelle, um den Verarbeitungsstatus der Anwendung zu verfolgen. Der erste gestartete Konsument erstellt die Tabelle.  | 
| DeleteItem | Amazon-DynamoDB-Tabelle. | Für den Fall, dass der Verbraucher split/merge Operationen an Kinesis Data Streams Streams-Shards ausführt. | 
| PutMetricData |  CloudWatch Amazon-Protokoll | Die KCL lädt auch Metriken hoch CloudWatch, die für die Überwachung der Anwendung nützlich sind. | 

Für diese Anwendung erstellen Sie eine einzelne IAM-Richtlinie, die alle vorstehenden Berechtigungen gewährt. In der Praxis empfiehlt es sich möglicherweise, zwei Richtlinien zu erstellen: eine für Produzenten und eine für Konsumenten.

**So erstellen Sie eine IAM-Richtlinie**

1. Suchen Sie den Amazon-Ressourcennamen (ARN) für den neuen Stream. Sie finden diesen ARN als **Stream ARN (Stream-ARN)** oben auf der Registerkarte **Details**. Das ARN-Format sieht folgendermaßen aus:

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*Region*  
Der Regionscode, beispielsweise `us-west-2`. Weitere Informationen finden Sie unter [Regionen und Verfügbarkeitskonzepte](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones).  
*Konto*  
Die AWS Konto-ID, wie in den [Kontoeinstellungen](https://console.aws.amazon.com/billing/home?#/account) angezeigt.  
*Name*  
Der Name des Streams von [Erstellen Sie einen Datenstream](tutorial-stock-data-kplkcl-create-stream.md), hier `StockTradeStream`.

1. Bestimmen Sie den ARN für die DynamoDB-Tabelle, die vom Konsumenten verwendet wird (und von der ersten Konsumenten-Instance erstellt wird). Er muss das folgende Format aufweisen:

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   Region und Konto stammen vom selben Ort wie im vorherigen Schritt. Hier ist der *Name* jedoch der Name der Tabelle, die von der Verbraucheranwendung erstellt und verwendet wird. Die vom Konsumenten verwendete KCL nutzt den Anwendungsnamen als Tabellennamen. Verwenden Sie `StockTradesProcessor`, dies ist der Anwendungsname, der später genutzt wird.

1. Wählen Sie in der IAM-Konsole unter **Policies** ([https://console.aws.amazon.com/iam/home \$1policies](https://console.aws.amazon.com/iam/home#policies)) die Option **Create policy** aus. Wenn Sie zum ersten Mal mit IAM-Richtlinien arbeiten, wählen Sie **Erste Schritte**, **Richtlinie erstellen** aus.

1. Wählen Sie **Select (Auswählen)** neben **Policy Generator (Richtliniengenerator)** aus.

1. Wählen Sie **Amazon Kinesis** als AWS Service.

1. Legen Sie `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` und `PutRecords` als zulässige Aktionen fest.

1. Geben Sie den in Schritt 1 erstellten ARN ein.

1. Verwenden Sie **Add Statement (Statement hinzufügen)** für die folgenden Elemente:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   Das Sternchen (`*`), das bei der Angabe einer ARN verwendet wird, ist nicht erforderlich. In diesem Fall liegt das daran, dass es keine bestimmte Ressource gibt, für die die `PutMetricData` Aktion aufgerufen wird. CloudWatch 

1. Wählen Sie **Next Step (Weiter)** aus.

1. Ändern Sie **Policy Name (Richtlinienname)** in `StockTradeStreamPolicy`, prüfen Sie den Code und wählen sie **Create Policy (Richtlinie erstellen)** aus.

Das erstellte Richtliniendokument sollte etwa wie folgt aussehen:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**So erstellen Sie einen IAM-Benutzer**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie auf der Seite **Users (Benutzer)** die Option **Add user (Benutzer hinzufügen)** aus.

1. Geben Sie `StockTradeStreamUser` als **Benutzername** ein.

1. Wählen Sie für **Access type (Zugriffstyp)** die Option **Programmatic access** (Programmgesteuerter Zugriff) und wählen Sie dann **Next: Permissions (Weiter: Berechtigungen)**.

1. Wählen Sie **Vorhandene Richtlinien direkt zuordnen**.

1. Suche die von Ihnen erstellte Richtlinie dem Namen nach. Markieren Sie das Kontrollkästchen links neben dem Richtliniennamen und wählen Sie dann **Next: Review (Weiter: Prüfen)**.

1. Überprüfen Sie die Details und die Zusammenfassung und wählen Sie dann **Create user (Benutzer erstellen)** aus.

1. Kopieren Sie die **Access key ID (Zugriffsschlüssel-ID)** und speichern Sie sie privat. Wählen Sie unter **Secret access key (Geheimer Zugriffsschlüssel)** die Option **Show (Anzeigen)** und speichern Sie auch diesen Schlüssel privat.

1. Fügen Sie die Zugriffs- und Geheimschlüssel in eine lokalen Datei an einem sichern Ort ein, auf den nur Sie Zugriff haben. Erstellen Sie für diese Anwendung eine Datei namens ` ~/.aws/credentials` (mit strikten Berechtigungen). Die Datei sollte das folgende Format aufweisen:

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**So fügen Sie einem Benutzer eine IAM-Richtlinie hinzu**

1. Klicken Sie in der IAM;-Konsole auf [Richtlinien](https://console.aws.amazon.com/iam/home?#policies) und wählen Sie **Richtlinienaktionen** aus. 

1. Klicken Sie auf `StockTradeStreamPolicy` und **Attach (Verknüpfen)**.

1. Wählen Sie `StockTradeStreamUser` und **Attach Policy (Richtlinie anfügen)** aus.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-iam-next"></a>

[Laden Sie den Implementierungscode herunter und erstellen Sie ihn](tutorial-stock-data-kplkcl-download.md)

# Laden Sie den Implementierungscode herunter und erstellen Sie ihn
<a name="tutorial-stock-data-kplkcl-download"></a>

Für [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md) wird Skeleton-Code bereitgestellt. Er enthält eine Stub-Implementierung für die Übernahme des Wertpapierdaten-Streams (*Produzent*) und die Verarbeitung der Daten (*Verbraucher*). Das folgende Verfahren zeigt, wie die Implementierung abgeschlossen wird. 

**So laden und erstellen Sie den Implementierungscode**

1. Laden Sie den [Quellcode](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1) auf den Computer herunter.

1. Erstellen Sie mit dem Quellcode unter Einhaltung der bereitgestellten Verzeichnisstruktur ein Projekt in der bevorzugten IDE.

1. Fügen Sie dem Projekt die folgenden Bibliotheken hinzu:
   + Amazon Kinesis Client Library (KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (Google-Kernbibliotheken für Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. Je nach IDE wird das Projekt möglicherweise automatisch erstellt. Wenn nicht, erstellen Sie es mit den für Ihre IDE erforderlichen Schritten.

Wenn Sie diese Schritte erfolgreich abgeschlossen haben, können Sie zum nächsten Abschnitt wechseln, [Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md). Wenn der Build Fehler erzeugt, müssen Sie diese untersuchen und beheben, bevor Sie fortfahren.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-download-next"></a>

[[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md)

# Implementieren Sie den Produzenten
<a name="tutorial-stock-data-kplkcl-producer"></a>



Die Anwendung im [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x[Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) verwendet das reale Szenario einer Überwachung des Wertpapierhandels. Im Folgenden wird kurz erläutert, wie dieses Szenario zum Produzenten und der unterstützenden Codestruktur zugeordnet wird.

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

**StockTrade Klasse**  
Ein bestimmter Wertpapierhandel wird durch eine Instance der `StockTrade`-Klasse dargestellt. Diese Instance enthält folgende Attribute: Tickersymbol, Preis, Anzahl der Anteile, Art des Handels (Kauf oder Verkauf) und ID zur eindeutigen Identifizierung der Handelsaktion. Dieser Klasse wird für Sie implementiert.

**Stream-Datensatz**  
Ein Stream ist eine Sequenz von Datensätzen. Ein Datensatz ist die Serialisierung einer `StockTrade`-Instance im JSON-Format. Beispiel:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator Klasse**  
`StockTradeGenerator` verfügt über eine Methode namens `getRandomTrade()`, die bei Aufruf Daten eines zufällig generierten Wertpapierhandels zurückgibt. Dieser Klasse wird für Sie implementiert.

**StockTradesWriter Klasse**  
Die `main`-Methode des Produzenten, `StockTradesWriter`, ruft kontinuierlich eine zufällige Handelsaktion ab und sendet die Daten an Kinesis Data Streams, indem sie die folgenden Aufgaben durchführt:  

1. Lesen des Stream- und Regionsnamen als Eingabe.

1. Erstellen eines `AmazonKinesisClientBuilder`.

1. Verwenden des Client Builder, um die Region, die Anmeldeinformationen und die Client-Konfiguration festzulegen.

1. Erstellen eines `AmazonKinesis`-Clients mit dem Client-Builder.

1. Sicherstellen, dass der Stream vorhanden und aktiv ist (wenn nicht, kommt es zu einer Beendigung mit Fehler).

1. Aufrufen der `StockTradeGenerator.getRandomTrade()`-Methode in einer Dauerschleife und anschließend Aufruf der `sendStockTrade`-Methode, um die Handelsdaten alle 100 Millisekunden an den Stream zu senden.
Die `sendStockTrade`-Methode der `StockTradesWriter`-Klasse hat den folgenden Code:  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

Beachten Sie die folgende Code-Struktur:
+ Die `PutRecord` API erwartet ein Byte-Array, und Sie müssen es in das JSON-Format `trade` konvertieren. Diese einzelne Codezeile führt die Operation aus:

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Bevor Sie die Handelsdaten senden können, erstellen Sie eine neue `PutRecordRequest`-Instance (namens `putRecord` in diesem Fall):

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  Jeder `PutRecord`-Aufruf erfordert den Namen des Streams, einen Partitionsschlüssel und einen Daten-Blob. Der folgende Code füllt diese Felder im `putRecord`-Objekt mit dessen `setXxxx()`-Methoden:

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  Im Beispiel wird ein Stock Ticket als Partitionsschlüssel verwendet, wodurch der Datensatz einem bestimmten Shard zugeordnet wird. In der Praxis sollten Sie Hunderte oder gar Tausende von Partitionsschlüsseln pro Shard haben, sodass die Datensätze in Ihrem Stream gleichmäßig verteilt sind. Weitere Informationen zum Hinzufügen von Daten zu einem Stream finden Sie unter [Daten zu einem Stream hinzufügen](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

  `putRecord` ist nun für das Senden an den Client bereit (`put`-Operation):

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ Eine Fehlerüberprüfung und Protokollierung sind immer nützliche Ergänzungen. Dieser Code protokolliert Fehlerbedingungen:

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Fügen Sie den try/catch Block rund um die `put` Operation hinzu:

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  Der Grund besteht darin, dass eine Kinesis-Data-Streams-`put`-Operation aufgrund eines Netzwerkfehlers fehlschlagen kann oder gedrosselt wird, weil die Durchsatzgrenze des Streams erreicht wird. Wir empfehlen, Ihre Wiederholungsrichtlinie für `put` Operationen sorgfältig zu überdenken, um Datenverlust zu vermeiden, z. B. die Verwendung eines Wiederholungsversuchs. 
+ Eine Statusprotokollierung ist hilfreich, wenn auch optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Der hier gezeigte Produzent verwendet die API-Funktionalität von Kinesis Data Streams für einzelne Datensätze `PutRecord`. In der Praxis ist es oft effizienter, die Eignung von `PutRecords` für mehrere Datensätze zu nutzen und mehrere Datensatzstapel gleichzeitig zu senden, wenn ein Produzent viele Datensätze erstellt. Weitere Informationen finden Sie unter [Daten zu einem Stream hinzufügen](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream).

**So führen Sie den Produzenten aus**

1. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei `~/.aws/credentials` gespeichert sind. 

1. Führen Sie die `StockTradeWriter`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradeStream us-west-2
   ```

   Wenn Sie Ihren Stream in einer anderen Region als `us-west-2` erstellt haben, müssen Sie stattdiesen hier diese Region angeben.

Die Ausgabe sollte folgendermaßen oder ähnlich aussehen:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Ihr Stream für die Wertpapierdaten wird nun von Kinesis Data Streams eingespeist.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[Implementieren Sie den Verbraucher](tutorial-stock-data-kplkcl-consumer.md)

# Implementieren Sie den Verbraucher
<a name="tutorial-stock-data-kplkcl-consumer"></a>

Die Konsumentenanwendung in [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x[Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) verarbeitet kontinuierlich den in [[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md) erstellten Stream mit Wertpapiertransaktionen. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung setzt auf Kinesis Client Library (KCL) auf, die viele der mühsamen Arbeiten einer Verbraucheranwendung übernimmt. Weitere Informationen finden Sie unter [Entwickeln Sie KCL 1.x-Verbraucher](developing-consumers-with-kcl.md). 

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

**StockTradesProcessor Klasse**  
Hauptklasse des Konsumenten, die für Sie bereitgestellt wird und folgende Aufgaben übernimmt:  
+ Liest die Namen von Anwendung, Stream und Region, die als Argumente übergeben werden
+ Liest Anmeldeinformationen aus `~/.aws/credentials`
+ Erstellt eine `RecordProcessorFactory`-Instance für die Instances von `RecordProcessor`, implementiert von einer `StockTradeRecordProcessor`-Instance.
+ Erstellt einen KCL-Auftragnehmer mit der `RecordProcessorFactory`-Instance und eine Standardkonfiguration samt Stream-Name, Anmeldeinformationen und Anwendungsname. 
+ Die Worker erstellt für jeden Shard (der dieser Konsumenten-Instance zugeordnet ist) einen neuen Thread, der die Datensätze in einer Schleife aus Kinesis Data Streams liest. Anschließend wird die `RecordProcessor`-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten.

**StockTradeRecordProcessor Klasse**  
Implementierung der `RecordProcessor`-Instance, die wiederum drei erforderliche Methoden implementiert: `initialize`, `processRecords` und `shutdown`.  
Wie an den Namen zu erkennen ist, werden `initialize` und `shutdown` von der Kinesis Client Library verwendet, um dem Datensatzprozessor mitzuteilen, wann er mit dem Empfang von Datensätzen beginnen bzw. wann er diesen stoppen soll, sodass er entsprechende anwendungsspezifische Einrichtungs- und Beendigungsaufgaben ausführen kann. Der Code hierfür wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der `processRecords` Methode, die wiederum `processRecord` für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leeres Code-Skelett bereitgestellt, das im nächsten Schritt näher erläutert und von Ihnen implementiert wird.  
Darüber hinaus hervorzuheben ist die Implementierung von Support-Methoden für `processRecord`: `reportStats` und `resetStats`, die im ursprünglichen Quellcode leer sind.  
Die `processRecords`-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:  
+  Ruft `processRecord` für jeden übergebenen Datensatz auf.
+ Ruft `reportStats()` zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dann `resetStats()`, um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält.
+ Legt den Zeitpunkt für die nächste Berichterstellung fest.
+ Ruft `checkpoint()` auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist. 
+ Legt den Zeitpunkt für das nächste Checkpointing fest.
Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter [Zusätzliche Informationen über den Verbraucher](#tutorial-stock-data-kplkcl-consumer-supplement).

**StockStats Klasse**  
Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:  
+ `addStockTrade(StockTrade)`: Fügt den angegebenen `StockTrade` in die ausgeführten Statistiken ein.
+ `toString()`: Gibt die Statistiken in einer formatierten Zeichenfolge zurück.
Diese Klasse verfolgt die beliebtesten Aktien, indem sie fortlaufend die Gesamtzahl der Trades für jede Aktie und deren maximale Anzahl zählt. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.

Fügen Sie Code zu den Methoden der `StockTradeRecordProcessor`-Klasse hinzu, wie in den folgenden Schritten gezeigt.

**So implementieren Sie den Konsumenten**

1. Implementieren Sie die `processRecord`-Methode, indem Sie ein richtig bemessenes `StockTrade`-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird.

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. Implementieren Sie eine einfache `reportStats`-Methode. Sie können das Ausgabeformat an Ihre Bedürfnisse anpassen.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. Implementieren Sie abschließend die `resetStats`-Methode, die eine neue `stockStats`-Instance erstellt.

   ```
   stockStats = new StockStats();
   ```

**So führen Sie den Konsumenten aus**

1. Führen Sie den unter [[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md) erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.

1. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei `~/.aws/credentials` gespeichert sind. 

1. Führen Sie die `StockTradesProcessor`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Beachten Sie, dass Sie, wenn Sie Ihren Stream in einer anderen Region als `us-west-2` erstellt haben, stattdiesen diese Region hier angeben müssen.

Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Zusätzliche Informationen über den Verbraucher
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

Wenn Sie mit den Vorteilen der Kinesis Client Library vertraut sind, die unter [Entwickeln Sie KCL 1.x-Verbraucher](developing-consumers-with-kcl.md) und anderswo erörtert werden, fragen Sie sich möglicherweise, warum Sie sie hier nutzen sollten. Obwohl Sie für die Verarbeitung nur einen einzelnen Shard-Stream und eine einzelne Konsumenten-Instance benötigen, ist es immer noch einfacher, den Konsumenten unter Verwendung der KCL zu implementieren. Vergleichen Sie die Implementierungsschritte im Produzentenabschnitt mit denen für den Konsumenten und Sie werden feststellen, dass die Implementierung eines Konsumenten vergleichsweise einfach ist. Dies liegt hauptsächlich an den von der KCL bereitgestellten Services.

In dieser Anwendung konzentrieren Sie sich auf die Implementierung einer Datensatzprozessor-Klasse, die einzelne Datensätze verarbeiten kann. Sie müssen sich keine Gedanken darüber machen, wie die Datensätze aus Kinesis Data Streams abgerufen werden. Die KCL ruft die Datensätze ab und den Datensatzprozessor auf, wenn neue Datensätze verfügbar sind. Sie müssen sich zudem keine Gedanken über die Anzahl der vorhandenen Shards und Konsumenten-Instances machen. Wenn der Stream skaliert wird, müssen Sie Ihre Anwendung nicht neu schreiben, damit mehr als ein Shard oder eine Konsumenten-Instance verwaltet werden können.

Der Begriff *Checkpointing* bedeutet, den Punkt im Stream bis hin zu den bisher verbrauchten und verarbeiteten Datensätzen aufzuzeichnen. Wenn die Anwendung abstürzt, wird der Stream von diesem Punkt aus gelesen und nicht vom Anfang des Streams. Das Checkpointing sowie zugehörige Entwurfsmuster und bewährte Methoden sind nicht Gegenstand dieses Kapitels. Es ist jedoch etwas, das Ihnen in Produktionsumgebungen begegnen könnte.

Wie Sie in [[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl-producer.md) erfahren haben, verwenden die `put`-Operationen in der API für Kinesis Data Streams einen *Partitionsschlüssel* als Eingabe. Kinesis Data Streams verwendet einen Partitionsschlüssel zum Aufteilen von Datensätzen auf mehrere Shards (wenn mehr als ein Shard im Stream vorhanden ist). Derselbe Partitionsschlüssel leitet immer an denselben Shard weiter. Dadurch kann der Konsument, der einen bestimmten Shard verarbeitet, davon ausgehen, dass Datensätze mit demselben Partitionsschlüssel nur an ihn gesendet werden und Datensätze mit diesem Partitionsschlüssel nicht bei einem anderen Konsumenten landen. Deshalb kann der Worker eines Konsumenten alle Datensätze mit demselben Partitionsschlüssel aggregieren, ohne zu befürchten, dass erforderliche Daten fehlen.

In dieser Anwendung findet keine intensive Verarbeitung der Datensätze durch den Konsumenten statt. Deshalb können Sie einen Shard verwenden und die Verarbeitung im selben Thread wie der KCL-Thread durchführen. In der Praxis sollten Sie allerdings zunächst die Anzahl der Shards skalieren. Gelegentlich kann es vorkommen, dass Sie die Verarbeitung an einen anderen Thread übergeben oder einen Thread-Pool nutzen möchten, wenn eine intensive Datensatzverarbeitung ansteht. Dadurch kann die KCL die neuen Datensätze schneller abrufen, während die anderen Threads parallel dazu die Datensätze verarbeiten. Multithread-Design ist nicht trivial und sollte mit fortgeschrittenen Techniken angegangen werden. Daher ist die Erhöhung der Anzahl Ihrer Shards in der Regel die effektivste Methode zur Skalierung.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[(Optional) Erweitern Sie die Zahl der Verbraucher](tutorial-stock-data-kplkcl-consumer-extension.md)

# (Optional) Erweitern Sie die Zahl der Verbraucher
<a name="tutorial-stock-data-kplkcl-consumer-extension"></a>

Möglicherweise ist die Anwendung [Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x[Tutorial: Verarbeiten Sie Aktiendaten in Echtzeit mit KPL und KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) bereits ausreichend für Ihre Zwecke. Dieser optionale Abschnitt zeigt, wie Sie den Konsumentencode erweitern können, um einem komplexeren Szenario gerecht zu werden.

Wenn Sie minütlich über die größten Verkaufsaufträge informiert werden möchten, können Sie die `StockStats`-Klasse an drei Stellen bearbeiten.

**So erweitern Sie den Konsumenten**

1. Fügen Sie neue Instance-Variablen hinzu:

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. Fügen Sie folgenden Code zu hinz `addStockTrade`:

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. Ändern Sie die `toString`-Methode, um die zusätzlichen Informationen zu drucken:

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

Wenn Sie den Konsumenten jetzt ausführen (führen Sie auch den Produzenten), sollten Sie eine Ausgabe ähnlich der folgenden sehen:

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-consumer-extension-next"></a>

[Bereinigen von Ressourcen](tutorial-stock-data-kplkcl-finish.md)

# Bereinigen von Ressourcen
<a name="tutorial-stock-data-kplkcl-finish"></a>

Da Sie für die Nutzung des Kinesis Data Streams zahlen, sollten Sie diesen ebenso wie die entsprechende Amazon-DynamoDB-Tabelle löschen, wenn sie nicht mehr benötigt werden. Für einen aktiven Stream fallen auch dann nominale Gebühren an, wenn Sie keine Datensätze senden oder abrufen. Der Grund hierfür ist, dass ein aktiver Stream durch kontinuierlichen „Horchen“ darauf, ob neue Datensätze oder Anforderungen zum Abrufen von Datensätzen eingehen, Ressourcen verbraucht.

**So löschen Sie den Stream und die Tabelle**

1. Fahren Sie alle laufenden Produzenten und Konsumenten herunter.

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Wählen Sie den Stream aus, den Sie für diese Anwendung erstellt haben (`StockTradeStream`).

1. Wählen Sie **Delete Stream (Stream löschen)** aus.

1. Öffnen Sie die DynamoDB-Konsole unter. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Löschen Sie die `StockTradesProcessor`-Tabelle.

## Zusammenfassung
<a name="tutorial-stock-data-kplkcl-summary"></a>

Die Verarbeitung einer großen Datenmenge nahezu in Echtzeit erfordert weder das Schreiben von kompliziertem Code noch die Entwicklung einer riesigen Infrastruktur. Es ist genauso einfach wie die Schreiblogik, eine kleine Datenmenge zu verarbeiten (wie Schreiben`processRecord(Record)`), aber Kinesis Data Streams zu verwenden, um so zu skalieren, dass es für eine große Menge an Streaming-Daten funktioniert. Sie müssen sich keine Gedanken über die Skalierung der Verarbeitung machen, da Kinesis Data Streams das für Sie übernimmt. Sie müssen lediglich Ihre Streaming-Datensätze an Kinesis Data Streams senden und eine Logik für die Verarbeitung neu empfangener Datensätze schreiben. 

Nachfolgend einige mögliche Erweiterungen für diese Anwendung.

**Shard-übergreifende Aggregation**  
Derzeit erhalten Sie Statistiken, die aus der Aggregation von Datensätzen resultieren, die von einem einzelnen Auftragnehmer eines einzelnen Shards empfangen werden. (Ein Shard kann jeweils nur von einem Auftragnehmer in einer einzelnen Anwendung verarbeitet werden.) Möglicherweise möchten Sie, wenn Sie eine Skalierung durchführen und mehr als einen Shard haben, eine Shard-übergreifende Aggregation vornehmen. Dies kann mit einer Pipeline-Architektur realisiert werden, bei der die Ausgabe eines jeden Auftragnehmers in einen anderen Stream mit einem einzelnen Shard eingespeist wird, der von einem Auftragnehmer verarbeitet wird, der die Ausgaben der ersten Phase aggregiert. Da die Daten aus der ersten Phase begrenzt sind (eine Ausgabe pro Minute pro Shard), können sie problemlos von nur einem Shard verarbeitet werden.

**Skalierung**  
Wenn der Stream skaliert wird, damit mehr Shards zur Verfügung stehen (da viele Produzenten Daten senden), geschieht dies dadurch, dass mehr Worker hinzugefügt werden. Sie können die Auftragnehmer in EC2-Instances ausführen und Auto-Scaling-Gruppen nutzen.

**Verwenden Sie Konnektoren zu Amazon S3 S3/ DynamoDB/Amazon Redshift/Storm**  
Da ein Stream kontinuierlich verarbeitet wird, kann seine Ausgabe an andere Ziele gesendet werden. AWS bietet [Konnektoren](https://github.com/awslabs/amazon-kinesis-connectors) zur Integration von Kinesis Data Streams mit anderen AWS Diensten und Tools von Drittanbietern.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl-next-steps"></a>
+ Weitere Informationen zur Verwendung der API-Operationen von Kinesis Data Streams finden Sie unter [Entwickeln Sie Produzenten mithilfe der Amazon Kinesis Data Streams Streams-API mit dem AWS SDK für Java](developing-producers-with-sdk.md), [Entwickeln Sie Verbraucher mit gemeinsamem Durchsatz mit dem AWS SDK für Java](developing-consumers-with-sdk.md), und [Kinesis-Datenstreams erstellen und verwalten](working-with-streams.md).
+ Weitere Informationen zur Kinesis Client Library finden Sie unter [Entwickeln Sie KCL 1.x-Verbraucher](developing-consumers-with-kcl.md). 
+ Weitere Informationen zur Optimierung Ihrer Anwendung finden Sie unter [Optimieren Sie die Verbraucher von Amazon Kinesis Data StreamsOptimieren Sie Kinesis Data Streams Streams-Nutzer](advanced-consumers.md). 

# Tutorial: Analysieren Sie Aktiendaten in Echtzeit mit Amazon Managed Service für Apache Flink
<a name="tutorial-stock-data"></a>

Im Szenario dieses Tutorials werden Wertpapierdaten in einen Datenstrom geschrieben. Zudem wird eine einfache [Amazon Managed Service für Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)-Anwendung erstellt, die Berechnungen mit dem Stream durchführt. Sie lernen, wie Sie einen Stream von Datensätzen an Kinesis Data Streams senden und eine Anwendung implementieren, die die Datensätze nahezu in Echtzeit verarbeitet und verarbeitet.

Mit Amazon Managed Service für Apache Flink können Sie Java oder Scala verwenden, um Streaming-Daten zu verarbeiten und zu analysieren. Mit diesem Service können Sie Java- oder Scala-Code für Streaming-Quellen erstellen und ausführen, um Zeitreihenanalysen durchzuführen, Echtzeit-Dashboards zu füttern und Echtzeit-Metriken zu erstellen.

Sie können Flink-Anwendungen in Managed Service für Apache Flink mithilfe von Open-Source-Bibliotheken erstellen, die auf [Apache Flink](https://flink.apache.org/) basieren. Apache Flink ist ein beliebtes Framework und eine verteilte Engine zum Verarbeiten von Datenströmen. 

**Wichtig**  
Nachdem Sie zwei Datenstreams und eine Anwendung erstellt haben, fallen für Ihr Konto geringe Gebühren für Kinesis Data Streams und Managed Service für die Nutzung von Apache Flink an, da diese nicht für das AWS kostenlose Kontingent in Frage kommen. Wenn Sie mit dieser Anwendung fertig sind, löschen Sie Ihre AWS Ressourcen, damit keine Gebühren mehr anfallen. 

Der Code greift nicht auf tatsächliche Wertpapierdaten zu, sondern simuliert nur deren Strom. Dazu werden zufällige Wertpapierdaten erzeugt. Wenn Sie Zugriff auf einen Echtzeit-Stream von Wertpapierdaten haben, möchten Sie vermutlich nützliche, zeitnahe Statistiken aus den Stream-Daten erzeugen. Sie können beispielsweise eine Zeitfensteranalyse durchführen, um festzustellen, welche Aktie in den letzten 5 Minuten am häufigsten erworben wurde. Oder Sie möchten im Falle eines zu großen Verkaufsauftrags (d. h. zu viele Anteile) benachrichtigt werden. Der Code in diesem Tutorial kann erweitert werden, um solche Funktionen bereitzustellen.

Bei den gezeigten Beispielen wird die Region USA West (Oregon) verwendet. Sie funktionieren aber auch für alle anderen [AWS -Regionen, die Managed Service für Apache Flink unterstützen](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region).

**Topics**
+ [Voraussetzungen für das Abschließen der Übungen](#setting-up-prerequisites)
+ [Richten Sie ein AWS Konto ein und erstellen Sie einen Administratorbenutzer](setting-up.md)
+ [Richten Sie das AWS Command Line Interface ()AWS CLI ein](setup-awscli.md)
+ [Erstellen Sie eine Managed Service for Apache Flink-Anwendung und führen Sie sie aus](get-started-exercise.md)

## Voraussetzungen für das Abschließen der Übungen
<a name="setting-up-prerequisites"></a>

Zur Durchführung der Schritte in dieser Anleitung benötigen Sie Folgendes:
+ [Java Development Kit (JDK), Version 8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html). Legen Sie die `JAVA_HOME` Umgebungsvariable so fest, dass sie auf Ihren JDK-Installationsspeicherort weist.
+ Wir empfehlen die Verwendung einer Entwicklungsumgebung (wie [Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) oder [IntelliJ Idea](https://www.jetbrains.com/idea/)), um Ihre Anwendung zu entwickeln und zu kompilieren.
+ [Git-Client.](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Installieren Sie den Git-Client, wenn Sie dies noch nicht getan haben.
+ [Apache Maven-Compiler-Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/). Maven muss sich in Ihrem Arbeitspfad befinden. Zum Testen Ihrer Apache Maven-Installation geben Sie Folgendes ein:

  ```
  $ mvn -version
  ```

Um zu beginnen, gehen Sie zu [Richten Sie ein AWS Konto ein und erstellen Sie einen Administratorbenutzer](setting-up.md).

# Richten Sie ein AWS Konto ein und erstellen Sie einen Administratorbenutzer
<a name="setting-up"></a>

Bevor Sie Amazon Managed Service für Apache Flink zum ersten Mal verwenden, führen Sie die folgenden Aufgaben aus:

1. [Melden Sie sich an für AWS](#setting-up-signup)

1. [Erstellen eines IAM-Benutzers](#setting-up-iam)

## Melden Sie sich an für AWS
<a name="setting-up-signup"></a>

Wenn Sie sich für Amazon Web Services (AWS) registrieren, wird Ihr AWS Konto automatisch für alle Dienste angemeldet AWS, einschließlich Amazon Managed Service für Apache Flink. Berechnet werden Ihnen aber nur die Services, die Sie nutzen.

Mit Managed Service für Apache Flink zahlen Sie nur für die Ressourcen, die Sie wirklich nutzen. Wenn Sie ein neuer AWS -Kunde sind, können Sie kostenlos mit Managed Service für Apache Flink beginnen. Weitere Informationen finden Sie unter [Kostenloses Kontingent für AWS](https://aws.amazon.com/free/).

Wenn Sie bereits ein AWS Konto haben, fahren Sie mit der nächsten Aufgabe fort. Wenn Sie noch kein AWS -Konto haben, führen Sie die folgenden Schritte aus, um ein Konto zu erstellen.

**Um ein AWS Konto zu erstellen**

1. Öffnen Sie [https://portal.aws.amazon.com/billing/die Anmeldung.](https://portal.aws.amazon.com/billing/signup)

1. Folgen Sie den Online-Anweisungen.

   Während der Anmeldung erhalten Sie einen Telefonanruf oder eine Textnachricht und müssen einen Verifizierungscode über die Telefontasten eingeben.

   Wenn Sie sich für eine anmelden AWS-Konto, *Root-Benutzer des AWS-Kontos*wird eine erstellt. Der Root-Benutzer hat Zugriff auf alle AWS-Services und Ressourcen des Kontos. Als bewährte Sicherheitsmethode weisen Sie einem Benutzer Administratorzugriff zu und verwenden Sie nur den Root-Benutzer, um [Aufgaben auszuführen, die Root-Benutzerzugriff erfordern](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks).

Notieren Sie sich Ihre AWS Konto-ID, da Sie sie für die nächste Aufgabe benötigen.

## Erstellen eines IAM-Benutzers
<a name="setting-up-iam"></a>

Dienste in AWS, wie Amazon Managed Service für Apache Flink, erfordern, dass Sie beim Zugriff auf sie Anmeldeinformationen angeben. Der Service kann feststellen, ob Sie über die Berechtigung für den Zugriff auf die Ressourcen im Besitz dieses Service verfügen. Das AWS-Managementkonsole erfordert, dass Sie Ihr Passwort eingeben. 

Sie können Zugangsschlüssel für Ihr AWS Konto erstellen, um auf die AWS Command Line Interface (AWS CLI) oder API zuzugreifen. Wir empfehlen jedoch nicht, dass Sie AWS mit den Anmeldeinformationen für Ihr AWS Konto darauf zugreifen. Stattdessen empfehlen wir Ihnen, AWS Identity and Access Management (IAM) zu verwenden. Erstellen Sie einen IAM-Benutzer und fügen Sie den Benutzer zu einer IAM-Gruppe mit Administrator-Berechtigungen hinzu. Anschließend gewähren Sie dem von Ihnen erstellten IAM-Benutzer administrative Berechtigungen. Danach können Sie mithilfe einer speziellen URL und der Anmeldeinformationen des IAM-Benutzers auf AWS zugreifen.

Wenn Sie sich für registriert haben AWS, aber noch keinen IAM-Benutzer für sich selbst erstellt haben, können Sie einen mit der IAM-Konsole erstellen.

Für die Erste-Schritte-Übungen in diesem Handbuch wird davon ausgegangen, dass Sie einen Benutzer (`adminuser`) mit Administratorrechten haben. Befolgen Sie die Schritte zum Einrichten des `adminuser` in Ihrem Konto.

**So erstellen Sie eine Gruppe für Administratoren:**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die IAM-Konsole unter. [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)

1. Wählen Sie im Navigationsbereich **Groups (Gruppen)** und dann **Create New Group (Neue Gruppe erstellen)** aus.

1. Geben Sie für **Group Name (Gruppenname)** einen Namen für die Gruppe ein, z. B. **Administrators**. Wählen Sie dann **Next Step (Nächster Schritt)** aus.

1. Aktivieren Sie in der Liste der Richtlinien das Kontrollkästchen neben der **AdministratorAccess**Richtlinie. Über das Menü **Filter (Filtern)** und das Feld **Search (Suchen)** können Sie die Liste filtern.

1. Wählen Sie **Next Step (Nächster Schritt)** und anschließend **Create Group (Gruppe erstellen)** aus.

Ihre neue Gruppe wird unter **Group Name** aufgeführt.

**Zum Erstellen eines IAM-Benutzers für sich selbst, fügen Sie ihn der Administratorengruppe hinzu und erstellen Sie ein Passwort**

1. Wählen Sie im Navigationsbereich **Users (Benutzer)** und dann **Add User (Benutzer hinzufügen)** aus.

1. Geben Sie im Feld **Benutzername** einen Benutzernamen ein.

1. Wählen Sie die beiden Optionen **Programmgesteuerter Zugriff** und **Zugriff auf die AWS -Managementkonsole** aus.

1. Wählen Sie **Weiter: Berechtigungen** aus.

1. Aktivieren Sie das Kontrollkästchen neben der **Administratorengruppe**. Wählen Sie dann **Next: Review** aus.

1. Wählen Sie **Create user** (Benutzer erstellen) aus.

**So melden Sie sich als neuer IAM-Benutzer an**

1. Melden Sie sich von der ab AWS-Managementkonsole.

1. Verwenden Sie das folgende URL-Format zum Anmelden bei der Konsole:

   `https://aws_account_number.signin.aws.amazon.com/console/`

   Das *aws\$1account\$1number* ist Ihre AWS Konto-ID ohne Bindestriche. Wenn Ihre AWS Konto-ID beispielsweise 1234-5678-9012 lautet, ersetzen Sie sie durch. *aws\$1account\$1number* **123456789012** *Informationen dazu, wie Sie Ihre Kontonummer finden, finden Sie unter [Ihre AWS Konto-ID und ihr Alias](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html) im IAM-Benutzerhandbuch.*

1. Geben Sie den IAM-Benutzernamen und das von Ihnen soeben erstellte Passwort ein. Wenn Sie angemeldet sind, wird in der Navigationsleiste *your\$1user\$1name* @ *your\$1aws\$1account\$1id* angezeigt.

**Anmerkung**  
Wenn Sie nicht möchten, dass die URL für Ihre Anmeldeseite Ihre AWS Konto-ID enthält, können Sie einen Kontoalias erstellen.

**So erstellen oder entfernen Sie einen Konto-Alias**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie im Navigationsbereich **Dashboard** aus.

1. Suchen Sie den Anmeldelink für IAM-Benutzer.

1. Um einen Alias zu erstellen, klicken Sie auf **Anpassen**. Geben Sie den gewünschten Namen für den Alias ein und wählen Sie **Yes, Create (Ja, erstellen)** aus.

1. Um den Alias zu löschen, wählen Sie **Customize (Anpassen)** und dann **Yes, Delete (Ja, löschen)**. Die Anmelde-URL verwendet wieder Ihre AWS Konto-ID.

Nach dem Erstellen eines Konto-Alias verwenden Sie die folgende URL, um sich anzumelden:

`https://your_account_alias.signin.aws.amazon.com/console/`

Um den Anmeldelink der IAM-Benutzer Ihres Kontos zu verifizieren, öffnen Sie die IAM-Konsole und prüfen dies im Dashboard unter **IAM users sign-in link**.

Weitere Informationen zu IAM finden Sie unter:
+ [AWS Identity and Access Management (ICH BIN)](https://aws.amazon.com/iam/)
+ [Erste Schritte mit IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [IAM Benutzerhandbuch](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## Nächster Schritt
<a name="setting-up-next-step-2"></a>

[Richten Sie das AWS Command Line Interface ()AWS CLI ein](setup-awscli.md)

# Richten Sie das AWS Command Line Interface ()AWS CLI ein
<a name="setup-awscli"></a>

In diesem Schritt laden Sie den herunter und konfigurieren ihn für AWS CLI die Verwendung mit Amazon Managed Service für Apache Flink.

**Anmerkung**  
Bei allen Erste-Schritte-Übungen in diesem Handbuch wird davon ausgegangen, dass Sie in Ihrem Konto Administrator-Anmeldeinformationen (`adminuser`) verwenden, um die Operationen auszuführen.

**Anmerkung**  
Wenn Sie den bereits AWS CLI installiert haben, müssen Sie möglicherweise ein Upgrade durchführen, um die neuesten Funktionen zu erhalten. Weitere Informationen finden Sie im *AWS Command Line Interface Benutzerhandbuch* unter [Installation der AWS Befehlszeilenschnittstelle](https://docs.aws.amazon.com/cli/latest/userguide/installing.html). Führen Sie den folgenden Befehl aus AWS CLI, um die Version von zu überprüfen:  

```
aws --version
```
Für die Übungen in diesem Tutorial ist die folgende AWS CLI Version oder höher erforderlich:  

```
aws-cli/1.16.63
```

**Um das einzurichten AWS CLI**

1. Herunterladen und Konfigurieren von AWS CLI. Eine Anleitung finden Sie unter den folgenden Themen im *AWS Command Line Interface -Benutzerhandbuch*: 
   + [Installieren des AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [Konfigurieren von AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. Fügen Sie der AWS CLI Konfigurationsdatei ein benanntes Profil für den Administratorbenutzer hinzu. Sie verwenden dieses Profil, wenn Sie die AWS CLI Befehle ausführen. Weitere Informationen zu benannten Profilen finden Sie unter [Benannte Profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html) im *AWS Command Line Interface Benutzerhandbuch*.

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   Eine Liste der verfügbaren AWS Regionen finden Sie unter [AWS Regionen und Endpunkte](https://docs.aws.amazon.com/general/latest/gr/rande.html) in der *Allgemeine Amazon Web Services-Referenz*.

1. Überprüfen Sie die Einrichtung, indem Sie die folgenden Hilfebefehle in die Befehlszeile eingeben: 

   ```
   aws help
   ```

Nachdem Sie ein AWS Konto eingerichtet haben AWS CLI, können Sie die nächste Übung ausprobieren, in der Sie eine Beispielanwendung konfigurieren und das end-to-end Setup testen.

## Nächster Schritt
<a name="setting-up-next-step-3"></a>

[Erstellen Sie eine Managed Service for Apache Flink-Anwendung und führen Sie sie aus](get-started-exercise.md)

# Erstellen Sie eine Managed Service for Apache Flink-Anwendung und führen Sie sie aus
<a name="get-started-exercise"></a>

In dieser Übung erstellen Sie eine Anwendung von Managed Service für Apache Flink mit Datenströmen als Quelle und Senke.

**Topics**
+ [Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams](#get-started-exercise-1)
+ [Schreiben Sie Beispieldatensätze in den Eingabe-Stream](#get-started-exercise-2)
+ [Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn](#get-started-exercise-5)
+ [Kompilieren Sie den Anwendungscode](#get-started-exercise-5.5)
+ [Laden Sie den Apache Flink-Streaming-Java-Code hoch](#get-started-exercise-6)
+ [Erstellen Sie die Anwendung Managed Service for Apache Flink und führen Sie sie aus](#get-started-exercise-7)

## Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams
<a name="get-started-exercise-1"></a>

Bevor Sie für diese Übung einen Amazon Managed Service für Apache Flink erstellen, erstellen Sie zwei Kinesis-Datenstreams (`ExampleInputStream`und`ExampleOutputStream`). Ihre Anwendung verwendet diese Streams für die Quell- und Ziel-Streams der Anwendung.

Sie können diese Streams mithilfe der Amazon-Kinesis-Konsole oder des folgenden AWS CLI -Befehls erstellen. Detaillierte Konsolenanweisungen finden Sie unter [Erstellen und Aktualisieren von Daten-Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**So erstellen Sie die Daten-Streams (AWS CLI)**

1. Verwenden Sie den folgenden Amazon Kinesis `create-stream` AWS CLI Kinesis-Befehl, um den ersten Stream (`ExampleInputStream`) zu erstellen.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern den Stream-Namen in `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Schreiben Sie Beispieldatensätze in den Eingabe-Stream
<a name="get-started-exercise-2"></a>

In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.

**Anmerkung**  
Dieser Abschnitt erfordert [AWS SDK für Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Erstellen Sie eine Datei `stock.py` mit dem folgenden Inhalt:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Im weiteren Verlauf des Tutorials führen Sie das `stock.py`-Skript zum Senden von Daten an die Anwendung aus. 

   ```
   $ python stock.py
   ```

## Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn
<a name="get-started-exercise-5"></a>

Der Java-Anwendungscode für diese Beispiele ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:

1. Klonen Sie das Remote-Repository mit dem folgenden Befehl:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Navigieren Sie zum `GettingStarted` Verzeichnis .

Der Anwendungscode befindet sich in den Dateien `CloudWatchLogSink.java` und `CustomSinkStreamingJob.java`. Beachten Sie Folgendes zum Anwendungscode:
+ Die Anwendung verwendet eine Kinesis-Quelle zum Lesen aus dem Quell-Stream. Der folgende Codeausschnitt erstellt die Kinesis-Senke:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Kompilieren Sie den Anwendungscode
<a name="get-started-exercise-5.5"></a>

In diesem Abschnitt verwenden Sie den Apache Maven-Compiler zum Erstellen des Java-Codes für die Anwendung. Weitere Informationen zum Installieren von Apache Maven und des Java Development Kit (JDK) finden Sie unter [Voraussetzungen für das Abschließen der Übungen](tutorial-stock-data.md#setting-up-prerequisites).

Ihre Java-Anwendung erfordert die folgenden Komponenten:
+ Eine [Projektobjektmodell (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html)-Datei. Diese Datei enthält Informationen zur Konfiguration und zu den Abhängigkeiten der Anwendung, einschließlich der Amazon Managed Service for Apache Flink-Bibliotheken.
+ Eine `main`-Methode, die die Logik der Anwendung enthält.

**Anmerkung**  
**Um den Kinesis-Konnektor für die folgende Anwendung zu verwenden, müssen Sie den Quellcode für den Konnektor herunterladen und ihn wie in der [Apache Flink-Dokumentation](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html) beschrieben erstellen.**

**So erstellen und kompilieren Sie den Anwendungscode**

1. Erstellen Sie eine Java/Maven Anwendung in Ihrer Entwicklungsumgebung. Weitere Informationen zum Erstellen einer Anwendung finden Sie in der Dokumentation für Ihre Entwicklungsumgebung:
   + [ Erstellen Sie Ihr erstes Java-Projekt (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ Erstellen, Ausführen und Packen Ihrer ersten Java-Anwendung (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Verwenden Sie den folgenden Code für eine Datei mit dem Namen `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Beachten Sie die folgenden Informationen zum vorherigen Codebeispiel:
   + Diese Datei enthält die `main`-Methode, die die Funktionalität der Anwendung definiert.
   + Ihre Anwendung erstellt Quell- und Senkenkonnektoren für den Zugriff auf externe Ressourcen, indem ein `StreamExecutionEnvironment`-Objekt verwendet wird. 
   + Die Anwendung erstellt Quell- und Senkenkonnektoren mit statischen Eigenschaften. Zum Verwenden dynamischer Anwendungseigenschaften verwenden Sie die Methoden `createSourceFromApplicationProperties` und `createSinkFromApplicationProperties`, um die Konnektoren zu erstellen. Diese Methoden lesen die Eigenschaften der Anwendung zum Konfigurieren der Konnektoren.

1. Zum Verwenden Ihres Anwendungscodes kompilieren und packen Sie ihn in eine JAR-Datei. Sie können Ihren Code auf zwei Arten kompilieren und packen:
   + Verwenden Sie das Befehlszeilen-Maven-Tool. Erstellen Sie Ihre JAR-Datei, indem Sie den folgenden Befehl in dem Verzeichnis ausführen, das die `pom.xml`-Datei enthält:

     ```
     mvn package
     ```
   + Verwenden Sie Ihre Entwicklungsumgebung. Weitere Informationen finden Sie in der Dokumentation Ihrer Entwicklungsumgebung.

   Sie können Ihr Paket als JAR-Datei hochladen oder komprimieren und als ZIP-Datei hochladen. Wenn Sie Ihre Anwendung mit dem erstellen AWS CLI, geben Sie Ihren Codeinhaltstyp (JAR oder ZIP) an.

1. Wenn während der Erstellung Fehler aufgetreten sind, überprüfen Sie, ob Ihre `JAVA_HOME`-Umgebungsvariable richtig eingestellt ist.

Wenn die Anwendung erfolgreich kompiliert wurde, wird die folgende Datei erstellt:

`target/java-getting-started-1.0.jar`

## Laden Sie den Apache Flink-Streaming-Java-Code hoch
<a name="get-started-exercise-6"></a>

In diesem Abschnitt erstellen Sie einen Amazon Simple Storage Service (Amazon S3)-Bucket und laden Ihren Anwendungscode hoch.

**So laden Sie den Anwendungscode hoch**

1. Öffnen Sie die Amazon S3 S3-Konsole unter [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Wählen Sie **Create Bucket** (Bucket erstellen) aus.

1. Geben Sie **ka-app-code-*<username>*** im Feld **Bucket-Name** ein. Fügen Sie dem Bucket-Namen ein Suffix hinzu, wie z. B. Ihren Benutzernamen, damit er global eindeutig ist. Wählen Sie **Weiter** aus.

1. Lassen Sie im Schritt **Optionen konfigurieren** die Einstellungen unverändert und klicken Sie auf **Weiter**.

1. Lassen Sie im Schritt **Berechtigungen festlegen** die Einstellungen unverändert und klicken Sie auf **Weiter**.

1. Wählen Sie **Create Bucket** (Bucket erstellen) aus.

1. Wählen Sie in der Amazon S3 S3-Konsole den *<username>* Bucket **ka-app-code-** und wählen Sie **Upload** aus.

1. Klicken Sie im Schritt **Auswählen von Dateien** auf **Hinzufügen von Dateien**. Navigieren Sie zu der `java-getting-started-1.0.jar`Datei, die Sie im vorherigen Schritt erstellt haben. Wählen Sie **Weiter** aus.

1. Lassen Sie im Schritt **Berechtigungen festlegen** die Einstellungen unverändert. Wählen Sie **Weiter** aus.

1. Lassen Sie im Schritt **Eigenschaften festlegen** die Einstellungen unverändert. Klicken Sie auf **Upload**.

Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.

## Erstellen Sie die Anwendung Managed Service for Apache Flink und führen Sie sie aus
<a name="get-started-exercise-7"></a>

Sie können eine Anwendung von Managed Service für Apache Flink entweder über die Konsole oder AWS CLI erstellen und ausführen.

**Anmerkung**  
Wenn Sie die Anwendung mithilfe der Konsole erstellen, werden Ihre AWS Identity and Access Management (IAM) und Amazon CloudWatch Logs-Ressourcen für Sie erstellt. Wenn Sie die Anwendung mithilfe von erstellen AWS CLI, erstellen Sie diese Ressourcen separat.

**Topics**
+ [Erstellen Sie die Anwendung und führen Sie sie aus (Konsole)](#get-started-exercise-7-console)
+ [Erstellen Sie die Anwendung und führen Sie sie aus (AWS CLI)](#get-started-exercise-7-cli)

### Erstellen Sie die Anwendung und führen Sie sie aus (Konsole)
<a name="get-started-exercise-7-console"></a>

Befolgen Sie diese Schritte, um die Anwendung über die Konsole zu erstellen, zu konfigurieren, zu aktualisieren und auszuführen.

#### Erstellen der Anwendung
<a name="get-started-exercise-7-console-create"></a>

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Wählen Sie auf dem Amazon-Kinesis-Dashboard die Option **Create analytics application** (Analyseanwendung erstellen) aus.

1. Geben Sie auf der Seite **Kinesis Analytics – Anwendung erstellen** die Anwendungsdetails wie folgt an:
   + Geben Sie als **Anwendungsname** ein **MyApplication**.
   + Geben Sie für **Beschreibung** den Text **My java test app** ein.
   + Wählen Sie für **Runtime (Laufzeit)** die Option **Apache Flink 1.6** aus.

1. Wählen Sie für **Zugriffsberechtigungen** die Option **Erstellen / Aktualisieren Sie IAM-Rolle `kinesis-analytics-MyApplication-us-west-2`** aus.

1. Wählen Sie **Create application** aus.

**Anmerkung**  
Wenn Sie mithilfe der Konsole eine Amazon Managed Service for Apache Flink-Anwendung erstellen, haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:  
Richtlinie: `kinesis-analytics-service-MyApplication-us-west-2`
Rolle: `kinesis-analytics-MyApplication-us-west-2`

#### Bearbeiten Sie die IAM-Richtlinie
<a name="get-started-exercise-7-console-iam"></a>

Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf die Kinesis-Datenströme.

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie **Policies (Richtlinien)**. Wählen Sie die **`kinesis-analytics-service-MyApplication-us-west-2`**-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. 

1. Wählen Sie auf der Seite **Summary (Übersicht)** die Option **Edit policy (Richtlinie bearbeiten)** aus. Wählen Sie den Tab **JSON**.

1. Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (*012345678901*) durch Ihre Konto-ID.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Konfigurieren Sie die Anwendung
<a name="get-started-exercise-7-console-configure"></a>

1. Wählen Sie auf der **MyApplication**Seite **Configure** aus.

1. Klicken Sie auf der Seite **Configure application (Anwendung konfigurieren)** auf die Option **Code location (Codespeicherort)**:
   + Geben Sie für **Amazon-S3-Bucket** **ka-app-code-*<username>*** ein.
   + Geben Sie als **Pfad zum Amazon-S3-Objekt** den Wert **java-getting-started-1.0.jar** ein.

1. Wählen Sie unter **Zugriff auf Anwendungsressourcen** für **Zugriffsberechtigungen** die Option **IAM-Rolle `kinesis-analytics-MyApplication-us-west-2` erstellen/aktualisieren** aus.

1. Geben Sie unter **Eigenschaften** für **Gruppen-ID** den Text **ProducerConfigProperties** ein.

1. Geben Sie die folgenden Eigenschaften und Werte der Anwendung ein:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/streams/latest/dev/get-started-exercise.html)

1. Stellen Sie unter **Überwachung** sicher, dass die **Ebene der Überwachungsmetriken** auf **Anwendung** eingestellt ist.

1. Wählen Sie für die **CloudWatch Protokollierung** das Kontrollkästchen **Aktivieren** aus.

1. Wählen Sie **Aktualisieren** aus.

**Anmerkung**  
Wenn Sie die CloudWatch Protokollierung aktivieren möchten, erstellt Managed Service for Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:   
Protokollgruppe: `/aws/kinesis-analytics/MyApplication`
Protokollstream: `kinesis-analytics-log-stream`

#### Führen Sie die Anwendung aus.
<a name="get-started-exercise-7-console-run"></a>

1. Wählen Sie auf der **MyApplication**Seite die Option **Ausführen** aus. Bestätigen Sie die Aktion.

1. Wenn die Anwendung ausgeführt wird, aktualisieren Sie die Seite. Die Konsole zeigt den **Application graph (Anwendungs-Graph)** an.

#### Beenden Sie die Anwendung
<a name="get-started-exercise-7-console-stop"></a>

Wählen Sie auf der **MyApplication**Seite **Stopp** aus. Bestätigen Sie die Aktion.

#### Aktualisieren der Anwendung
<a name="get-started-exercise-7-console-update"></a>

Mithilfe der Konsole können Sie Anwendungseinstellungen wie beispielsweise Anwendungseigenschaften, Überwachungseinstellungen und den Speicherort oder den Dateinamen der JAR-Anwendungsdatei aktualisieren. Außerdem können Sie die JAR-Anwendungsdatei erneut aus dem Amazon-S3-Bucket laden, wenn Sie den Anwendungscode aktualisieren müssen.

Wählen Sie auf der **MyApplication**Seite **Configure** aus. Aktualisieren Sie die Anwendungseinstellungen und klicken Sie auf **Aktualisieren**.

### Erstellen Sie die Anwendung und führen Sie sie aus (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

In diesem Abschnitt verwenden Sie die, AWS CLI um die Anwendung Managed Service for Apache Flink zu erstellen und auszuführen. Managed Service for Apache Flink verwendet den `kinesisanalyticsv2` AWS CLI Befehl, um Managed Service for Apache Flink-Anwendungen zu erstellen und mit ihnen zu interagieren.

#### Erstellen einer Berechtigungsrichtlinie
<a name="get-started-exercise-7-cli-policy"></a>

Zuerst erstellen Sie eine Berechtigungsrichtlinie mit zwei Anweisungen: eine, die Berechtigungen für die `read`-Aktion auf den Quell-Stream zulässt, und eine andere, die Berechtigungen für die `write`-Aktionen auf den Senken-Stream zulässt. Anschließend fügen Sie die Richtlinie an eine IAM-Rolle (die Sie im nächsten Abschnitt erstellen) an. Wenn Managed Service für Apache Flink also die Rolle übernimmt, verfügt der Service über die erforderlichen Berechtigungen zum Lesen aus dem Quell-Stream und zum Schreiben in den Senken-Stream.

Verwenden Sie den folgenden Code zum Erstellen der `KAReadSourceStreamWriteSinkStream`-Berechtigungsrichtlinie. Ersetzen Sie `username` durch den Benutzernamen, den Sie verwendet haben, um den Amazon-S3-Bucket zum Speichern des Anwendungscodes zu erstellen. Ersetzen Sie die Konto-ID in den Amazon-Ressourcennamen (ARNs) (`012345678901`) durch Ihre Konto-ID.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

 step-by-stepAnweisungen zum Erstellen einer Berechtigungsrichtlinie finden Sie unter [Tutorial: Erstellen und Anhängen Ihrer ersten vom Kunden verwalteten Richtlinie](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) im *IAM-Benutzerhandbuch*.

**Anmerkung**  
Um auf andere AWS Dienste zuzugreifen, können Sie den AWS SDK für Java verwenden. Managed Service für Apache Flink setzt die vom SDK benötigten Anmeldeinformationen automatisch auf die der IAM-Rolle für die Dienstausführung, die mit Ihrer Anwendung verknüpft ist. Es sind keine weiteren Schritte erforderlich.

#### Erstellen einer IAM-Rolle
<a name="get-started-exercise-7-cli-role"></a>

In diesem Abschnitt erstellen Sie eine IAM-Rolle, von der Managed Service for Apache Flink annehmen kann, dass sie einen Quellstream liest und in den Sink-Stream schreibt.

Managed Service für Apache Flink kann ohne Berechtigungen nicht auf Ihren Stream zugreifen. Sie erteilen diese Berechtigungen über eine IAM-Rolle. Jeder IAM-Rolle sind zwei Richtlinien angefügt. Die Vertrauensrichtlinie erteilt Managed Service für Apache Flink die Berechtigung zum Übernehmen der Rolle und die Berechtigungsrichtlinie bestimmt, was Managed Service für Apache Flink nach Annahme der Rolle tun kann.

Sie können die Berechtigungsrichtlinie, die Sie im vorherigen Abschnitt erstellt haben, dieser Rolle anfügen.

**So erstellen Sie eine IAM-Rolle**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie im Navigationsbereich **Roles (Rollen)** und **Create Role (Rolle erstellen)** aus.

1. Wählen Sie unter **Typ der vertrauenswürdigen Entität auswählen** die Option **AWS -Service** aus. Wählen Sie unter **Choose the service that will use this role (Wählen Sie den Service aus, der diese Rolle verwendet)** die Option **Kinesis** aus. Wählen Sie unter **Select your use case (Wählen Sie Ihren Anwendungsfall aus)** die Option **Kinesis Analytics** aus.

   Wählen Sie **Weiter: Berechtigungen** aus.

1. Wählen Sie auf der Seite **Attach permissions policies** (Berechtigungsrichtlinien hinzufügen) **Next: Review** (Weiter: Überprüfen) aus. Sie fügen Berechtigungsrichtlinien an, nachdem Sie die Rolle erstellt haben.

1. Geben Sie auf der Seite **Create role (Rolle erstellen)** den Text **KA-stream-rw-role** für **Role name (Rollenname)** ein. Wählen Sie **Rolle erstellen** aus.

   Jetzt haben Sie eine neue IAM-Rolle mit dem Namen `KA-stream-rw-role` erstellt. Im nächsten Schritt aktualisieren Sie die Vertrauens- und Berechtigungsrichtlinien für die Rolle.

1. Fügen Sie die Berechtigungsrichtlinie der Rolle an.
**Anmerkung**  
Für diese Übung übernimmt Managed Service für Apache Flink diese Rolle sowohl für das Lesen von Daten aus einem Kinesis-Datenstrom (Quelle) als auch zum Schreiben der Ausgabedaten in einen anderen Kinesis-Datenstrom. Daher fügen Sie die Richtlinie an, die Sie im vorherigen Schritt erstellt haben, [Erstellen einer Berechtigungsrichtlinie](#get-started-exercise-7-cli-policy).

   1. Wählen Sie auf der Seite **Summary (Übersicht)** die Registerkarte **Permissions (Berechtigungen)** aus.

   1. Wählen Sie **Attach Policies (Richtlinien anfügen)** aus.

   1. Geben Sie im Suchfeld **KAReadSourceStreamWriteSinkStream** (die Richtlinie, die Sie im vorhergehenden Abschnitt erstellt haben) ein.

   1. Wählen Sie die **KAReadInputStreamWriteOutputStream**-Richtlinie und wählen Sie **Richtlinie anhängen** aus.

Sie haben nun die Service-Ausführungsrolle erstellt, die Ihre Anwendung für den Zugriff auf Ressourcen verwendet. Notieren Sie sich den ARN der neuen Rolle.

 step-by-stepAnweisungen zum Erstellen einer Rolle finden Sie unter [Erstellen einer IAM-Rolle (Konsole)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) im *IAM-Benutzerhandbuch*.

#### Erstellen Sie die Anwendung Managed Service für Apache Flink
<a name="get-started-exercise-7-cli-create"></a>

1. Speichern Sie den folgenden JSON-Code in eine Datei mit dem Namen `create_request.json`. Ersetzen Sie den Beispiel-Rollen-ARN durch den ARN für die Rolle, die Sie zuvor erstellt haben. Ersetzen Sie das Bucket-ARN-Suffix (`username`) mit dem Suffix, das Sie im vorherigen Abschnitt gewählt haben. Ersetzen Sie die beispielhafte Konto-ID (`012345678901`) in der Service-Ausführungsrolle mit Ihrer Konto-ID.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Führen Sie die [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html)-Aktion mit der vorherigen Anforderung zum Erstellen der Anwendung aus: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

Die Anwendung wird nun erstellt. Sie starten die Anwendung im nächsten Schritt.

#### Starten der Anwendung
<a name="get-started-exercise-7-cli-start"></a>

In diesem Abschnitt verwenden Sie die [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html)-Aktion, um die Anwendung zu starten.

**So starten Sie die Anwendung**

1. Speichern Sie den folgenden JSON-Code in eine Datei mit dem Namen `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Führen Sie die [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html)-Aktion mit der vorherigen Anforderung zum Starten der Anwendung aus:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

Die Anwendung wird jetzt ausgeführt. Sie können die Kennzahlen Managed Service for Apache Flink auf der CloudWatch Amazon-Konsole überprüfen, um sicherzustellen, dass die Anwendung funktioniert.

#### Stoppen der Anwendung
<a name="get-started-exercise-7-cli-stop"></a>

In diesem Abschnitt verwenden Sie die [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html)-Aktion, um die Anwendung zu stoppen.

**So stoppen Sie die Anwendung**

1. Speichern Sie den folgenden JSON-Code in eine Datei mit dem Namen `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Führen Sie die [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html)-Aktion mit der folgenden Anforderung zum Stoppen der Anwendung aus:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

Die Anwendung wird nun gestoppt.

# Tutorial: Verwendung AWS Lambda mit Amazon Kinesis Data Streams
<a name="tutorial-stock-data-lambda"></a>

In dieser Anleitung erstellen Sie eine Lambda-Funktion, um Ereignisse aus einem Kinesis-Datenstrom zu nutzen. In diesem Beispielszenario schreibt eine benutzerdefinierte Anwendung Datensätze in einen Kinesis-Datenstrom. AWS Lambda fragt dann diesen Datenstrom ab und ruft, wenn er neue Datensätze erkennt, Ihre Lambda-Funktion auf. AWS Lambda führt dann die Lambda-Funktion aus, indem es die Ausführungsrolle annimmt, die Sie bei der Erstellung der Lambda-Funktion angegeben haben.

Eine ausführliche schrittweise Anleitung finden Sie unter [Tutorial: Verwenden von AWS Lambda mit Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html). 

**Anmerkung**  
In diesem Tutorial wird davon ausgegangen, dass Sie über einige Kenntnisse der grundlegenden Lambda-Operationen und der AWS Lambda Konsole verfügen. Falls Sie dies noch nicht getan haben, folgen Sie den Anweisungen unter [Erste Schritte mit AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html), um Ihre erste Lambda-Funktion zu erstellen.

# Verwenden Sie die AWS Streaming-Datenlösung für Amazon Kinesis
<a name="examples-streaming-solution"></a>

Die AWS Streaming-Datenlösung für Amazon Kinesis konfiguriert automatisch die AWS Services, die für die einfache Erfassung, Speicherung, Verarbeitung und Bereitstellung von Streaming-Daten erforderlich sind. Die Lösung bietet mehrere Optionen zur Lösung von Anwendungsfällen für Streaming-Daten, die mehrere AWS Dienste nutzen, darunter Kinesis Data Streams AWS Lambda, Amazon API Gateway und Amazon Managed Service für Apache Flink. 

Jede Lösung enthält die folgenden Komponenten:
+ Ein CloudFormation Paket zur Bereitstellung des vollständigen Beispiels.
+ Ein CloudWatch Dashboard zur Anzeige von Anwendungsmetriken.
+ CloudWatch Alarme zu den relevantesten Anwendungsmetriken.
+ Alle erforderlichen IAM-Rollen und -Richtlinien.

Die Lösung finden Sie hier: [Streaming-Datenlösung für Amazon Kinesis](https://aws.amazon.com/solutions/implementations/aws-streaming-data-solution-for-amazon-kinesis/)