

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Implementieren Sie den Verbraucher
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung setzt auf Kinesis Client Library (KCL) auf, die viele der mühsamen Arbeiten einer Verbraucheranwendung übernimmt. Weitere Informationen finden Sie unter [Informationen zu KCL 1.x und 2.x](shared-throughput-kcl-consumers.md). 

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

**StockTradesProcessor Klasse**  
Die für Sie bereitgestellte Hauptklasse des Verbrauchers, die die folgenden Aufgaben erfüllt:  
+ Liest die als Argumente übergebenen Anwendungs-, Datenstrom- und Regionsnamen.
+ Erzeugt eine `KinesisAsyncClient` Instanz mit dem Namen der Region.
+ Erstellt eine `StockTradeRecordProcessorFactory`-Instance für die Instances von `ShardRecordProcessor`, implementiert von einer `StockTradeRecordProcessor`-Instance. 
+ Erzeugt eine `ConfigsBuilder` Instanz mit der `StockTradeRecordProcessorFactory` Instanz `KinesisAsyncClient` `StreamName``ApplicationName`,, und. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich.
+ KCL-Scheduler (zuvor in den KCL-Versionen 1.x als KCL-Worker bezeichnet) mit der `ConfigsBuilder`-Instance erstellen. 
+ Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die `StockTradeRecordProcessor`-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten. 

**StockTradeRecordProcessor Klasse**  
Implementierung der `StockTradeRecordProcessor`-Instance, die wiederum fünf erforderliche Methoden implementiert: `initialize`, `processRecords`, `leaseLost`, `shardEnded` und `shutdownRequested`.   
Die Methoden `initialize` und `shutdownRequested` werden von KCL verwendet, um dem Datensatzverarbeiter mitzuteilen, wann er bereit sein muss, Datensätze zu empfangen, und wann der Empfang von Datensätzen gestoppt werden muss, damit alle anwendungsspezifischen Einrichtungs- und Beendigungsaufgaben ausgeführt werden können. `leaseLost` und `shardEnded` werden verwendet, um Logik zu implementieren, die beim Verlust eines Lease oder bei Erreichen des Shards-Endes im Rahmen der Shard-Verarbeitung auszuführen ist. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.   
Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der `processRecords` Methode, die wiederum `processRecord` für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.   
Beachten Sie außerdem die Implementierung der Hilfsmethoden für `processRecord`: `reportStats` und `resetStats`, die im ursprünglichen Quellcode leer sind.   
Die `processRecords`-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:  
+ Für jeden übergebenen Datensatz wird `processRecord` aufgerufen. 
+ Ruft `reportStats()` zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dann `resetStats()`, um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält.
+ Legt den Zeitpunkt für die nächste Berichterstellung fest.
+ Ruft `checkpoint()` auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist. 
+ Legt den Zeitpunkt für das nächste Checkpointing fest.
Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter [Verwendung der Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats Klasse**  
Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:  
+ `addStockTrade(StockTrade)`: fügt die angegebene `StockTrade` in die ausgeführten Statistiken ein.
+ `toString()`: gibt die Statistiken als formatierte Zeichenfolge zurück.
Diese Klasse verfolgt die beliebtesten Aktien, indem sie fortlaufend die Gesamtzahl der Trades für jede Aktie und deren maximale Anzahl zählt. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.

Fügen Sie Code zu den Methoden der `StockTradeRecordProcessor`-Klasse hinzu, wie in den folgenden Schritten gezeigt. 

**So implementieren Sie den Konsumenten**

1. Implementieren Sie die `processRecord`-Methode, indem Sie ein richtig bemessenes `StockTrade`-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implementieren Sie eine `reportStats` Methode. Ändern Sie das Ausgabeformat nach Ihren Wünschen. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implementieren Sie die Methode `resetStats`, die eine neue `stockStats`-Instance erstellt. 

   ```
   stockStats = new StockStats();
   ```

1. Implementieren Sie die folgenden Methoden, die für die `ShardRecordProcessor` Schnittstelle erforderlich sind:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**So führen Sie den Konsumenten aus**

1. Führen Sie den unter [[Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md)Implementieren Sie den Produzenten](tutorial-stock-data-kplkcl2-producer.md) erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.

1. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei `~/.aws/credentials` gespeichert sind. 

1. Führen Sie die `StockTradesProcessor`-Klasse mit den folgenden Argumenten aus:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Beachten Sie, dass Sie, wenn Sie Ihren Stream in einer anderen Region als `us-west-2` erstellt haben, stattdiesen diese Region hier angeben müssen.

Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Nächste Schritte
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Optional) Erweitern Sie den Consumer](tutorial-stock-data-kplkcl2-consumer-extension.md)