

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Implementieren Sie den Produzenten
<a name="tutorial-stock-data-kplkcl2-producer"></a>

Dieses Tutorial verwendet das reale Szenario einer Überwachung des Wertpapierhandels. Die folgenden Prinzipien erläutern kurz, wie dieses Szenario zum Produzenten und seiner unterstützenden Codestruktur passt.

Beachten Sie den [Quellcode](https://github.com/aws-samples/amazon-kinesis-learning ) und prüfen Sie die folgenden Informationen.

**StockTrade Klasse**  
Ein bestimmter Wertpapierhandel wird durch eine Instance der StockTrade-Klasse dargestellt. Diese Instance enthält folgende Attribute: Tickersymbol, Preis, Anzahl der Anteile, Art des Handels (Kauf oder Verkauf) und ID zur eindeutigen Identifizierung der Handelsaktion. Dieser Klasse wird für Sie implementiert. 

**Stream-Datensatz**  
Ein Stream ist eine Sequenz von Datensätzen. Ein Datensatz ist die Serialisierung einer `StockTrade`-Instance im JSON-Format. Beispiel:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator Klasse**  
StockTradeGenerator hat eine Methode namens`getRandomTrade()`, die bei jedem Aufruf einen neuen zufällig generierten Aktienhandel zurückgibt. Dieser Klasse wird für Sie implementiert.

**StockTradesWriter Klasse**  
Die `main`-Methode des Produzenten, StockTradesWriter, ruft kontinuierlich eine zufällige Handelsaktion ab und sendet die Daten an Kinesis Data Streams, indem sie die folgenden Aufgaben durchführt:  

1. Liest den Datenstromnamen und den Regionsnamen als Eingabe.

1. Verwendet die`KinesisAsyncClientBuilder`, um die Region, die Anmeldeinformationen und die Client-Konfiguration festzulegen. 

1. Sicherstellen, dass der Stream vorhanden und aktiv ist (wenn nicht, kommt es zu einer Beendigung mit Fehler). 

1. Aufrufen der `StockTradeGenerator.getRandomTrade()`-Methode in einer Dauerschleife und anschließend Aufruf der `sendStockTrade`-Methode, um die Handelsdaten alle 100 Millisekunden an den Stream zu senden. 
Die `sendStockTrade`-Methode der `StockTradesWriter`-Klasse hat den folgenden Code:   

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

Beachten Sie die folgende Code-Struktur:
+ Die `PutRecord` API erwartet ein Byte-Array, und Sie müssen den Handel in das JSON-Format konvertieren. Diese einzelne Codezeile führt die Operation aus: 

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ Bevor Sie die Transaktion senden können, erstellen Sie eine neue `PutRecordRequest`-Instance (in diesem Fall Anforderung genannt). Jede `request` benötigt den Namen des Streams, einen Partitionsschlüssel und einen Daten-Blob. 

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  Das Beispiel verwendet einen Börsenticker als Partitionsschlüssel, der den Datensatz einem bestimmten Shard zuordnet. In der Praxis sollten Sie Hunderte oder gar Tausende von Partitionsschlüsseln pro Shard haben, sodass die Datensätze in Ihrem Stream gleichmäßig verteilt sind. Weitere Informationen zum Hinzufügen von Daten zu einem Stream finden Sie unter [Daten in Amazon Kinesis Data Streams schreiben](building-producers.md).

  `request` kann die Daten jetzt an den Client senden (PUT-Operation): 

  ```
     kinesisClient.putRecord(request).get();
  ```
+ Eine Fehlerüberprüfung und Protokollierung sind immer nützliche Ergänzungen. Dieser Code protokolliert Fehlerbedingungen: 

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  Fügen Sie den try/catch Block rund um die `put` Operation hinzu: 

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  Der Grund besteht darin, dass eine Kinesis Data Streams-PUT-Operation aufgrund eines Netzwerkfehlers fehlschlagen kann oder gedrosselt wird, weil die Durchsatzgrenze des Streams erreicht wird. Es wird empfohlen, dass Sie Ihre Wiederholungsrichtlinie für `put` Operationen sorgfältig prüfen, um Datenverlust zu vermeiden, z. B. die Verwendung eines Wiederholungsversuchs. 
+ Eine Statusprotokollierung ist hilfreich, wenn auch optional:

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
Der hier gezeigte Produzent verwendet die API-Funktionalität von Kinesis Data Streams für einzelne Datensätze `PutRecord`. In der Praxis ist es oft effizienter, die Eignung von `PutRecords` für mehrere Datensätze zu nutzen und mehrere Datensatzstapel gleichzeitig zu senden, wenn ein Produzent viele Datensätze erstellt. Weitere Informationen finden Sie unter [Daten in Amazon Kinesis Data Streams schreiben](building-producers.md).

**So führen Sie den Produzenten aus**

1. Verifizieren Sie, dass der in [Erstellen Sie eine IAM-Richtlinie und einen Benutzer](tutorial-stock-data-kplkcl2-iam.md) abgerufene Zugriffsschlüssel samt geheimem Schlüsselpaar in der Datei `~/.aws/credentials` gespeichert wurde. 

1. Führen Sie die `StockTradeWriter`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradeStream us-west-2
   ```

   Wenn Sie den Stream in einer anderen Region als `us-west-2` erstellt haben, müssen Sie stattdessen hier diese Region angeben.

Die Ausgabe sollte folgendermaßen oder ähnlich aussehen:

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

Ihre Wertpapierdaten werden nun von Kinesis Data Streams eingespeist.

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[Implementieren Sie den Verbraucher](tutorial-stock-data-kplkcl2-consumer.md)